implemented methods for calculated field update/delete in cluster service

This commit is contained in:
IrynaMatveieva 2024-11-13 17:42:55 +02:00
parent 4d8b62eb21
commit 3072861a8f
12 changed files with 260 additions and 9 deletions

View File

@ -369,6 +369,10 @@
<groupId>com.google.firebase</groupId>
<artifactId>firebase-admin</artifactId>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.TbResourceInfo;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.DeviceId;
@ -118,7 +119,11 @@ public class EntityStateSourcingListener {
ApiUsageState apiUsageState = (ApiUsageState) event.getEntity();
tbClusterService.onApiStateChange(apiUsageState, null);
}
default -> {}
case CALCULATED_FIELD -> {
onCalculatedFieldUpdate(event.getEntity(), event.getOldEntity());
}
default -> {
}
}
}
@ -149,7 +154,8 @@ public class EntityStateSourcingListener {
case RULE_CHAIN -> {
RuleChain ruleChain = (RuleChain) event.getEntity();
if (RuleChainType.CORE.equals(ruleChain.getType())) {
Set<RuleChainId> referencingRuleChainIds = JacksonUtil.fromString(event.getBody(), new TypeReference<>() {});
Set<RuleChainId> referencingRuleChainIds = JacksonUtil.fromString(event.getBody(), new TypeReference<>() {
});
if (referencingRuleChainIds != null) {
referencingRuleChainIds.forEach(referencingRuleChainId ->
tbClusterService.broadcastEntityStateChangeEvent(tenantId, referencingRuleChainId, ComponentLifecycleEvent.UPDATED));
@ -177,7 +183,12 @@ public class EntityStateSourcingListener {
TbResourceInfo tbResource = (TbResourceInfo) event.getEntity();
tbClusterService.onResourceDeleted(tbResource, null);
}
default -> {}
case CALCULATED_FIELD -> {
CalculatedField calculatedField = (CalculatedField) event.getEntity();
tbClusterService.onCalculatedFieldDeleted(tenantId, calculatedField, null);
}
default -> {
}
}
}
@ -247,6 +258,15 @@ public class EntityStateSourcingListener {
}
}
private void onCalculatedFieldUpdate(Object entity, Object oldEntity) {
CalculatedField calculatedField = (CalculatedField) entity;
CalculatedField oldCalculatedField = null;
if (oldEntity instanceof CalculatedField) {
oldCalculatedField = (CalculatedField) oldEntity;
}
tbClusterService.onCalculatedFieldUpdated(calculatedField, oldCalculatedField);
}
private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) {
String data = JacksonUtil.toString(JacksonUtil.valueToTree(assignedDevice));
if (data != null) {

View File

@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.AttributeScope;
@ -64,6 +65,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
@ -71,6 +73,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.service.Validator.validateEntityId;
@ -83,6 +86,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
private final CalculatedFieldService calculatedFieldService;
private final AttributesService attributesService;
private final TimeseriesService timeseriesService;
private final RocksDBService rocksDBService;
private ListeningScheduledExecutorService scheduledExecutor;
private ListeningExecutorService calculatedFieldExecutor;
@ -212,6 +216,10 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
calculatedFieldLinks.remove(calculatedFieldId);
calculatedFields.remove(calculatedFieldId);
states.keySet().removeIf(ctxId -> ctxId.startsWith(calculatedFieldId.getId().toString()));
List<String> statesToRemove = states.keySet().stream()
.filter(key -> key.startsWith(calculatedFieldId.getId().toString()))
.collect(Collectors.toList());
rocksDBService.deleteAll(statesToRemove);
} catch (Exception e) {
log.trace("Failed to delete calculated field.", e);
callback.onFailure(e);
@ -223,7 +231,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf));
PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize);
cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new ArrayList<>()).add(link));
// TODO: read all states(CalculatedFieldCtx)
rocksDBService.getAll().forEach((ctxId, ctx) -> states.put(ctxId, JacksonUtil.convertValue(ctx, CalculatedFieldCtx.class)));
states.keySet().removeIf(ctxId -> calculatedFields.keySet().stream().noneMatch(id -> ctxId.startsWith(id.toString())));
}
@ -319,6 +327,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
}
calculatedFieldCtx.setState(state);
states.put(ctxId, calculatedFieldCtx);
rocksDBService.put(ctxId, Objects.requireNonNull(JacksonUtil.toString(calculatedFieldCtx)));
}
private String performCalculation(Map<String, String> argumentValues, CalculatedFieldConfiguration calculatedFieldConfiguration) {

View File

@ -0,0 +1,95 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.entitiy.cf;
import lombok.extern.slf4j.Slf4j;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.springframework.stereotype.Service;
import org.thingsboard.server.utils.RocksDBConfig;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
@Slf4j
public class RocksDBService {
private final RocksDB db;
private final WriteOptions writeOptions;
public RocksDBService(RocksDBConfig config) throws RocksDBException {
this.db = config.getDb();
this.writeOptions = new WriteOptions().setSync(true);
}
public void put(String key, String value) {
try {
db.put(writeOptions, key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
} catch (RocksDBException e) {
log.error("Failed to store data to RocksDB", e);
}
}
public void delete(String key) {
try {
db.delete(writeOptions, key.getBytes(StandardCharsets.UTF_8));
} catch (RocksDBException e) {
log.error("Failed to delete data from RocksDB", e);
}
}
public void deleteAll(List<String> keys) {
try (WriteBatch batch = new WriteBatch()) {
for (String key : keys) {
batch.delete(key.getBytes(StandardCharsets.UTF_8));
}
db.write(writeOptions, batch);
} catch (RocksDBException e) {
log.error("Failed to delete data from RocksDB", e);
}
}
public String get(String key) {
try {
byte[] value = db.get(key.getBytes(StandardCharsets.UTF_8));
return value != null ? new String(value, StandardCharsets.UTF_8) : null;
} catch (RocksDBException e) {
log.error("Failed to retrieve data from RocksDB", e);
return null;
}
}
public Map<String, String> getAll() {
Map<String, String> map = new HashMap<>();
try (RocksIterator iterator = db.newIterator()) {
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
String key = new String(iterator.key(), StandardCharsets.UTF_8);
String value = new String(iterator.value(), StandardCharsets.UTF_8);
map.put(key, value);
}
} catch (Exception e) {
log.error("Failed to retrieve data from RocksDB", e);
}
return map;
}
}

View File

@ -38,10 +38,12 @@ import org.thingsboard.server.common.data.TbResourceInfo;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EdgeId;
@ -666,7 +668,8 @@ public class DefaultTbClusterService implements TbClusterService {
private void pushDeviceUpdateMessage(TenantId tenantId, EdgeId edgeId, EntityId entityId, EdgeEventActionType action) {
log.trace("{} Going to send edge update notification for device actor, device id {}, edge id {}", tenantId, entityId, edgeId);
switch (action) {
case ASSIGNED_TO_EDGE -> pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), edgeId), null);
case ASSIGNED_TO_EDGE ->
pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), edgeId), null);
case UNASSIGNED_FROM_EDGE -> {
EdgeId relatedEdgeId = findRelatedEdgeIdIfAny(tenantId, entityId);
pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), relatedEdgeId), null);
@ -743,4 +746,33 @@ public class DefaultTbClusterService implements TbClusterService {
}
}
@Override
public void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField) {
var created = oldCalculatedField == null;
broadcastEntityChangeToTransport(calculatedField.getTenantId(), calculatedField.getId(), calculatedField, null);
broadcastEntityStateChangeEvent(calculatedField.getTenantId(), calculatedField.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
sendCalculatedFieldEvent(calculatedField.getTenantId(), calculatedField.getId(), created, !created, false);
}
@Override
public void onCalculatedFieldDeleted(TenantId tenantId, CalculatedField calculatedField, TbQueueCallback callback) {
CalculatedFieldId calculatedFieldId = calculatedField.getId();
broadcastEntityDeleteToTransport(tenantId, calculatedFieldId, calculatedField.getName(), callback);
sendCalculatedFieldEvent(tenantId, calculatedFieldId, false, false, true);
broadcastEntityStateChangeEvent(tenantId, calculatedFieldId, ComponentLifecycleEvent.DELETED);
}
private void sendCalculatedFieldEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, boolean added, boolean updated, boolean deleted) {
TransportProtos.CalculatedFieldMsgProto.Builder builder = TransportProtos.CalculatedFieldMsgProto.newBuilder();
builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
builder.setCalculatedFieldIdMSB(calculatedFieldId.getId().getMostSignificantBits());
builder.setCalculatedFieldIdLSB(calculatedFieldId.getId().getLeastSignificantBits());
builder.setAdded(added);
builder.setUpdated(updated);
builder.setDeleted(deleted);
TransportProtos.CalculatedFieldMsgProto msg = builder.build();
pushMsgToCore(tenantId, calculatedFieldId, ToCoreMsg.newBuilder().setCalculatedFieldMsg(msg).build(), null);
}
}

View File

@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.event.ErrorEvent;
import org.thingsboard.server.common.data.event.Event;
import org.thingsboard.server.common.data.event.LifecycleEvent;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.NotificationRequestId;
import org.thingsboard.server.common.data.id.TenantId;
@ -85,6 +86,7 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.entitiy.cf.TbCalculatedFieldService;
import org.thingsboard.server.service.notification.NotificationSchedulerService;
import org.thingsboard.server.service.ota.OtaPackageStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
@ -148,6 +150,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private final TbImageService imageService;
private final RuleEngineCallService ruleEngineCallService;
private final TbCoreConsumerStats stats;
private final TbCalculatedFieldService calculatedFieldService;
private MainQueueConsumerManager<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig> mainConsumer;
private QueueConsumerManager<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
@ -175,7 +178,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
NotificationSchedulerService notificationSchedulerService,
NotificationRuleProcessor notificationRuleProcessor,
TbImageService imageService,
RuleEngineCallService ruleEngineCallService) {
RuleEngineCallService ruleEngineCallService,
TbCalculatedFieldService calculatedFieldService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService,
eventPublisher, jwtSettingsService);
this.stateService = stateService;
@ -191,6 +195,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
this.imageService = imageService;
this.ruleEngineCallService = ruleEngineCallService;
this.queueFactory = tbCoreQueueFactory;
this.calculatedFieldService = calculatedFieldService;
}
@PostConstruct
@ -308,6 +313,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
forwardToEventService(toCoreMsg.getErrorEventMsg(), callback);
} else if (toCoreMsg.hasLifecycleEventMsg()) {
forwardToEventService(toCoreMsg.getLifecycleEventMsg(), callback);
} else if (toCoreMsg.hasCalculatedFieldMsg()) {
forwardToCalculatedFieldService(toCoreMsg.getCalculatedFieldMsg(), callback);
}
} catch (Throwable e) {
log.warn("[{}] Failed to process message: {}", id, msg, e);
@ -658,6 +665,18 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
});
}
private void forwardToCalculatedFieldService(TransportProtos.CalculatedFieldMsgProto calculatedFieldMsg, TbCallback callback) {
var tenantId = toTenantId(calculatedFieldMsg.getTenantIdMSB(), calculatedFieldMsg.getTenantIdLSB());
var calculatedFieldId = new CalculatedFieldId(new UUID(calculatedFieldMsg.getCalculatedFieldIdMSB(), calculatedFieldMsg.getCalculatedFieldIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> calculatedFieldService.onCalculatedFieldMsg(calculatedFieldMsg, callback));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process calculated field message for calculated field [{}]", tenantId.getId(), calculatedFieldId.getId(), t);
callback.onFailure(t);
});
}
private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) {
TenantId tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
NotificationRequestId notificationRequestId = new NotificationRequestId(new UUID(msg.getRequestIdMSB(), msg.getRequestIdLSB()));

View File

@ -0,0 +1,52 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.utils;
import jakarta.annotation.PreDestroy;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class RocksDBConfig {
@Value("${rocksdb.db_path}")
private String dbPath;
private RocksDB db;
static {
RocksDB.loadLibrary();
}
public RocksDB getDb() throws RocksDBException {
if (db == null) {
Options options = new Options().setCreateIfMissing(true);
db = RocksDB.open(options, dbPath);
}
return db;
}
@PreDestroy
public void close() {
if (db != null) {
db.close();
db = null;
}
}
}

View File

@ -424,6 +424,10 @@ sql:
pool_size: "${SQL_RELATIONS_POOL_SIZE:4}" # This value has to be reasonably small to prevent the relation query from blocking all other DB calls
query_timeout: "${SQL_RELATIONS_QUERY_TIMEOUT_SEC:20}" # This value has to be reasonably small to prevent the relation query from blocking all other DB calls
rocksdb:
# Rocksdb path
db_path: "${ROCKS_DB_PATH:}"
# Actor system parameters
actors:
system:

View File

@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.TbResourceInfo;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId;
@ -114,4 +115,8 @@ public interface TbClusterService extends TbQueueClusterService {
void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId sourceEdgeId);
void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField);
void onCalculatedFieldDeleted(TenantId tenantId, CalculatedField calculatedField, TbQueueCallback callback);
}

View File

@ -1512,6 +1512,7 @@ message ToCoreMsg {
DeviceConnectProto deviceConnectMsg = 50;
DeviceDisconnectProto deviceDisconnectMsg = 51;
DeviceInactivityProto deviceInactivityMsg = 52;
CalculatedFieldMsgProto calculatedFieldMsg = 53;
}
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */

View File

@ -29,20 +29,21 @@ import org.thingsboard.server.common.data.id.HasId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
import org.thingsboard.server.dao.service.DataValidator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.entity.AbstractEntityService.checkConstraintViolation;
import static org.thingsboard.server.dao.service.Validator.validateId;
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
@Service("CalculatedFieldDaoService")
@Slf4j
@RequiredArgsConstructor
public class BaseCalculatedFieldService implements CalculatedFieldService {
public class BaseCalculatedFieldService extends AbstractEntityService implements CalculatedFieldService {
public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
public static final String INCORRECT_CALCULATED_FIELD_ID = "Incorrect calculatedFieldId ";
@ -60,6 +61,8 @@ public class BaseCalculatedFieldService implements CalculatedFieldService {
log.trace("Executing save calculated field, [{}]", calculatedField);
CalculatedField savedCalculatedField = calculatedFieldDao.save(tenantId, calculatedField);
createOrUpdateCalculatedFieldLink(tenantId, savedCalculatedField);
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedCalculatedField.getTenantId()).entityId(savedCalculatedField.getId())
.entity(savedCalculatedField).created(calculatedField.getId() == null).build());
return savedCalculatedField;
} catch (Exception e) {
checkConstraintViolation(e,

View File

@ -166,6 +166,8 @@
<weisj-jsvg.version>1.6.1</weisj-jsvg.version>
<drewnoakes-metadata-extractor.version>2.19.0</drewnoakes-metadata-extractor.version>
<firebase-admin.version>9.2.0</firebase-admin.version>
<rocksdbjni.version>9.4.0</rocksdbjni.version>
</properties>
<modules>
@ -2272,6 +2274,11 @@
<artifactId>metadata-extractor</artifactId>
<version>${drewnoakes-metadata-extractor.version}</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdbjni.version}</version>
</dependency>
</dependencies>
</dependencyManagement>