implemented partitioning
This commit is contained in:
parent
f929de4209
commit
5c5dc474cb
@ -29,7 +29,9 @@ public interface CalculatedFieldExecutionService {
|
||||
|
||||
void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List<? extends KvEntry> telemetry);
|
||||
|
||||
void onEntityProfileChanged(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback);
|
||||
void onCalculatedFieldStateMsg(TransportProtos.CalculatedFieldStateMsgProto proto, TbCallback callback);
|
||||
|
||||
void onEntityProfileChangedMsg(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback);
|
||||
|
||||
void onProfileEntityMsg(TransportProtos.ProfileEntityMsgProto proto, TbCallback callback);
|
||||
|
||||
|
||||
@ -98,6 +98,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
|
||||
|
||||
@ -184,7 +185,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
try {
|
||||
tpi = partitionService.resolve(ServiceType.TB_CORE, cf.getTenantId(), cf.getId());
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to resolve partition for CalculatedField [{}], tenant [{}]. Reason: {}",
|
||||
log.warn("Failed to resolve partition for CalculatedField [{}], tenant id [{}]. Reason: {}",
|
||||
cf.getId(), cf.getTenantId(), e.getMessage());
|
||||
continue;
|
||||
}
|
||||
@ -233,8 +234,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
|
||||
@Override
|
||||
protected void cleanupEntityOnPartitionRemoval(CalculatedFieldId entityId) {
|
||||
calculatedFields.remove(entityId);
|
||||
states.keySet().removeIf(ctxId -> ctxId.cfId().equals(entityId.getId()));
|
||||
cleanupEntity(entityId);
|
||||
}
|
||||
|
||||
private void cleanupEntity(CalculatedFieldId calculatedFieldId) {
|
||||
calculatedFields.remove(calculatedFieldId);
|
||||
states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId()));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -245,7 +250,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
log.info("Received CalculatedFieldMsgProto for processing: tenantId=[{}], calculatedFieldId=[{}]", tenantId, calculatedFieldId);
|
||||
if (proto.getDeleted()) {
|
||||
log.warn("Executing onCalculatedFieldDelete, calculatedFieldId=[{}]", calculatedFieldId);
|
||||
onCalculatedFieldDelete(calculatedFieldId, callback);
|
||||
onCalculatedFieldDelete(tenantId, calculatedFieldId, callback);
|
||||
callback.onSuccess();
|
||||
}
|
||||
CalculatedField cf = getOrFetchFromDb(tenantId, calculatedFieldId);
|
||||
@ -364,7 +369,34 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEntityProfileChanged(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback) {
|
||||
public void onCalculatedFieldStateMsg(TransportProtos.CalculatedFieldStateMsgProto proto, TbCallback callback) {
|
||||
try {
|
||||
TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
|
||||
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB()));
|
||||
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
||||
String state = proto.getState();
|
||||
CalculatedFieldEntityCtx calculatedFieldEntityCtx = state.isEmpty() ? JacksonUtil.fromString(state, CalculatedFieldEntityCtx.class) : null;
|
||||
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId);
|
||||
if (tpi.isMyPartition()) {
|
||||
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId.getId(), entityId.getId());
|
||||
if (calculatedFieldEntityCtx != null) {
|
||||
states.put(ctxId, calculatedFieldEntityCtx);
|
||||
rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), state);
|
||||
} else {
|
||||
states.remove(ctxId);
|
||||
rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId));
|
||||
}
|
||||
} else {
|
||||
log.debug("[{}] Calculated Field belongs to external partition {}", calculatedFieldId, tpi.getFullTopicName());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.trace("Failed to process calculated field update state msg: [{}]", proto, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEntityProfileChangedMsg(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback) {
|
||||
try {
|
||||
TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
|
||||
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
||||
@ -377,9 +409,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
|
||||
calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId)
|
||||
.forEach(cfId -> {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId);
|
||||
if (tpi.isMyPartition()) {
|
||||
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId());
|
||||
states.remove(ctxId);
|
||||
rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId));
|
||||
} else {
|
||||
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, null);
|
||||
}
|
||||
});
|
||||
|
||||
initializeStateForEntityByProfile(tenantId, entityId, newProfileId, callback);
|
||||
@ -398,12 +435,22 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
if (proto.getDeleted()) {
|
||||
log.info("Executing profile entity deleted msg, tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
||||
profileEntities.get(profileId).remove(entityId);
|
||||
List<String> statesToRemove = states.keySet().stream()
|
||||
.filter(ctxEntityId -> ctxEntityId.entityId().equals(entityId.getId()))
|
||||
.map(JacksonUtil::writeValueAsString)
|
||||
.toList();
|
||||
states.keySet().removeIf(ctxEntityId -> ctxEntityId.entityId().equals(entityId.getId()));
|
||||
rocksDBService.deleteAll(statesToRemove);
|
||||
List<CalculatedFieldId> calculatedFieldIds = Stream.concat(
|
||||
calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId).stream()
|
||||
.map(CalculatedFieldLink::getCalculatedFieldId),
|
||||
calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, profileId).stream()
|
||||
.map(CalculatedFieldLink::getCalculatedFieldId)
|
||||
).toList();
|
||||
calculatedFieldIds.forEach(cfId -> {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId);
|
||||
if (tpi.isMyPartition()) {
|
||||
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId());
|
||||
states.remove(ctxId);
|
||||
rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId));
|
||||
} else {
|
||||
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, null);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
log.info("Executing profile entity added msg, tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
||||
profileEntities.computeIfAbsent(profileId, id -> new HashSet<>()).add(entityId);
|
||||
@ -414,11 +461,26 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
}
|
||||
}
|
||||
|
||||
private void sendUpdateCalculatedFieldStateMsg(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, CalculatedFieldState calculatedFieldState) {
|
||||
TransportProtos.CalculatedFieldStateMsgProto.Builder msgBuilder = TransportProtos.CalculatedFieldStateMsgProto.newBuilder()
|
||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||
.setCalculatedFieldIdMSB(calculatedFieldId.getId().getMostSignificantBits())
|
||||
.setCalculatedFieldIdLSB(calculatedFieldId.getId().getLeastSignificantBits())
|
||||
.setEntityType(entityId.getEntityType().name())
|
||||
.setEntityIdMSB(entityId.getId().getMostSignificantBits())
|
||||
.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
|
||||
if (calculatedFieldState != null) {
|
||||
msgBuilder.setState(JacksonUtil.writeValueAsString(calculatedFieldState));
|
||||
}
|
||||
clusterService.pushMsgToCore(tenantId, entityId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msgBuilder).build(), null);
|
||||
}
|
||||
|
||||
private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) {
|
||||
CalculatedField oldCalculatedField = getOrFetchFromDb(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId());
|
||||
boolean shouldReinit = true;
|
||||
if (hasSignificantChanges(oldCalculatedField, updatedCalculatedField)) {
|
||||
onCalculatedFieldDelete(updatedCalculatedField.getId(), callback);
|
||||
onCalculatedFieldDelete(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId(), callback);
|
||||
} else {
|
||||
calculatedFields.put(updatedCalculatedField.getId(), updatedCalculatedField);
|
||||
calculatedFieldsCtx.put(updatedCalculatedField.getId(), new CalculatedFieldCtx(updatedCalculatedField, tbelInvokeService));
|
||||
@ -428,8 +490,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
return shouldReinit;
|
||||
}
|
||||
|
||||
private void onCalculatedFieldDelete(CalculatedFieldId calculatedFieldId, TbCallback callback) {
|
||||
private void onCalculatedFieldDelete(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbCallback callback) {
|
||||
try {
|
||||
cleanupEntity(calculatedFieldId);
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldId);
|
||||
Set<CalculatedFieldId> calculatedFieldIds = partitionedEntities.get(tpi);
|
||||
if (calculatedFieldIds != null) {
|
||||
calculatedFieldIds.remove(calculatedFieldId);
|
||||
}
|
||||
calculatedFields.remove(calculatedFieldId);
|
||||
calculatedFieldsCtx.remove(calculatedFieldId);
|
||||
states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId()));
|
||||
@ -606,7 +674,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
}
|
||||
|
||||
private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map<String, ArgumentEntry> argumentValues) {
|
||||
CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(calculatedFieldCtx.getCfId().getId(), entityId.getId());
|
||||
CalculatedFieldId cfId = calculatedFieldCtx.getCfId();
|
||||
CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId());
|
||||
CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, this::fetchCalculatedFieldEntityState);
|
||||
|
||||
CalculatedFieldState state = calculatedFieldEntityCtx.getState();
|
||||
@ -616,8 +685,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
}
|
||||
if (state.updateState(argumentValues)) {
|
||||
calculatedFieldEntityCtx.setState(state);
|
||||
TenantId tenantId = calculatedFieldCtx.getTenantId();
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, calculatedFieldCtx.getCfId());
|
||||
if (tpi.isMyPartition()) {
|
||||
states.put(entityCtxId, calculatedFieldEntityCtx);
|
||||
rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx));
|
||||
} else {
|
||||
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, state);
|
||||
}
|
||||
|
||||
boolean allArgsPresent = calculatedFieldCtx.getArguments().keySet().containsAll(state.getArguments().keySet());
|
||||
if (allArgsPresent) {
|
||||
@ -626,7 +701,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
@Override
|
||||
public void onSuccess(CalculatedFieldResult result) {
|
||||
if (result != null) {
|
||||
pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), entityId, result);
|
||||
pushMsgToRuleEngine(tenantId, entityId, result);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -322,6 +322,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
forwardToCalculatedFieldService(toCoreMsg.getEntityProfileUpdateMsg(), callback);
|
||||
} else if (toCoreMsg.hasProfileEntityMsg()) {
|
||||
forwardToCalculatedFieldService(toCoreMsg.getProfileEntityMsg(), callback);
|
||||
} else if (toCoreMsg.hasCalculatedFieldStateMsg()) {
|
||||
forwardToCalculatedFieldService(toCoreMsg.getCalculatedFieldStateMsg(), callback);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}] Failed to process message: {}", id, msg, e);
|
||||
@ -687,7 +689,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
private void forwardToCalculatedFieldService(TransportProtos.EntityProfileUpdateMsgProto profileUpdateMsg, TbCallback callback) {
|
||||
var tenantId = toTenantId(profileUpdateMsg.getTenantIdMSB(), profileUpdateMsg.getTenantIdLSB());
|
||||
var entityId = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getEntityIdMSB(), profileUpdateMsg.getEntityIdLSB()));
|
||||
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityProfileChanged(profileUpdateMsg, callback));
|
||||
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityProfileChangedMsg(profileUpdateMsg, callback));
|
||||
DonAsynchron.withCallback(future,
|
||||
__ -> callback.onSuccess(),
|
||||
t -> {
|
||||
@ -708,6 +710,18 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
});
|
||||
}
|
||||
|
||||
private void forwardToCalculatedFieldService(TransportProtos.CalculatedFieldStateMsgProto calculatedFieldStateMsgProto, TbCallback callback) {
|
||||
var tenantId = toTenantId(calculatedFieldStateMsgProto.getTenantIdMSB(), calculatedFieldStateMsgProto.getTenantIdLSB());
|
||||
var calculatedFieldId = new CalculatedFieldId(new UUID(calculatedFieldStateMsgProto.getCalculatedFieldIdMSB(), calculatedFieldStateMsgProto.getCalculatedFieldIdLSB()));
|
||||
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onCalculatedFieldStateMsg(calculatedFieldStateMsgProto, callback));
|
||||
DonAsynchron.withCallback(future,
|
||||
__ -> callback.onSuccess(),
|
||||
t -> {
|
||||
log.warn("[{}] Failed to process calculated field state message for entityId [{}]", tenantId.getId(), calculatedFieldId.getId(), t);
|
||||
callback.onFailure(t);
|
||||
});
|
||||
}
|
||||
|
||||
private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) {
|
||||
TenantId tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
|
||||
NotificationRequestId notificationRequestId = new NotificationRequestId(new UUID(msg.getRequestIdMSB(), msg.getRequestIdLSB()));
|
||||
|
||||
@ -807,6 +807,17 @@ message ProfileEntityMsgProto {
|
||||
bool deleted = 10;
|
||||
}
|
||||
|
||||
message CalculatedFieldStateMsgProto {
|
||||
int64 tenantIdMSB = 1;
|
||||
int64 tenantIdLSB = 2;
|
||||
int64 calculatedFieldIdMSB = 3;
|
||||
int64 calculatedFieldIdLSB = 4;
|
||||
string entityType = 5;
|
||||
int64 entityIdMSB = 6;
|
||||
int64 entityIdLSB = 7;
|
||||
string state = 8;
|
||||
}
|
||||
|
||||
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.
|
||||
message SubscriptionInfoProto {
|
||||
int64 lastActivityTime = 1;
|
||||
@ -1541,6 +1552,7 @@ message ToCoreMsg {
|
||||
CalculatedFieldMsgProto calculatedFieldMsg = 53;
|
||||
EntityProfileUpdateMsgProto entityProfileUpdateMsg = 54;
|
||||
ProfileEntityMsgProto profileEntityMsg = 55;
|
||||
CalculatedFieldStateMsgProto calculatedFieldStateMsg = 56;
|
||||
}
|
||||
|
||||
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user