modified relation-action-nodes
This commit is contained in:
parent
818e4b315a
commit
e8ee4b0d2b
@ -148,7 +148,7 @@ public class BaseTimeseriesService implements TimeseriesService {
|
||||
|
||||
private void saveAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
||||
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
|
||||
throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Only read only");
|
||||
throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only");
|
||||
}
|
||||
futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey(), ttl));
|
||||
futures.add(timeseriesDao.saveLatest(tenantId, entityId, tsKvEntry));
|
||||
|
||||
@ -22,6 +22,7 @@ import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
@ -78,20 +79,20 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
withCallback(processEntityRelationAction(ctx, msg),
|
||||
filterResult -> ctx.tellNext(msg, filterResult ? SUCCESS : FAILURE), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
||||
filterResult -> ctx.tellNext(filterResult.getMsg(), filterResult.isResult() ? SUCCESS : FAILURE), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
}
|
||||
|
||||
protected ListenableFuture<Boolean> processEntityRelationAction(TbContext ctx, TbMsg msg) {
|
||||
protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg) {
|
||||
return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer));
|
||||
}
|
||||
|
||||
protected abstract boolean createEntityIfNotExists();
|
||||
|
||||
protected abstract ListenableFuture<Boolean> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer);
|
||||
protected abstract ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer);
|
||||
|
||||
protected abstract C loadEntityNodeActionConfig(TbNodeConfiguration configuration) throws TbNodeException;
|
||||
|
||||
@ -119,9 +120,11 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
|
||||
if (EntitySearchDirection.FROM.name().equals(config.getDirection())) {
|
||||
searchDirectionIds.setFromId(EntityIdFactory.getByTypeAndId(entityContainer.getEntityType().name(), entityContainer.getEntityId().toString()));
|
||||
searchDirectionIds.setToId(msg.getOriginator());
|
||||
searchDirectionIds.setOrignatorDirectionFrom(false);
|
||||
} else {
|
||||
searchDirectionIds.setToId(EntityIdFactory.getByTypeAndId(entityContainer.getEntityType().name(), entityContainer.getEntityId().toString()));
|
||||
searchDirectionIds.setFromId(msg.getOriginator());
|
||||
searchDirectionIds.setOrignatorDirectionFrom(true);
|
||||
}
|
||||
return searchDirectionIds;
|
||||
}
|
||||
@ -146,6 +149,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
|
||||
protected static class SearchDirectionIds {
|
||||
private EntityId fromId;
|
||||
private EntityId toId;
|
||||
private boolean orignatorDirectionFrom;
|
||||
}
|
||||
|
||||
private static class EntityCacheLoader extends CacheLoader<EntityKey, EntityContainer> {
|
||||
@ -233,7 +237,15 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
|
||||
}
|
||||
return targetEntity;
|
||||
}
|
||||
}
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
protected static class RelationContainer {
|
||||
|
||||
private TbMsg msg;
|
||||
private boolean result;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -35,6 +35,9 @@ import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@RuleNode(
|
||||
type = ComponentType.ACTION,
|
||||
@ -60,19 +63,59 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ListenableFuture<Boolean> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entity) {
|
||||
return createIfAbsent(ctx, msg, entity);
|
||||
protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entity) {
|
||||
ListenableFuture<Boolean> future = createIfAbsent(ctx, msg, entity);
|
||||
return Futures.transform(future, result -> {
|
||||
RelationContainer container = new RelationContainer();
|
||||
if (result && config.isChangeOriginatorToRelatedEntity()) {
|
||||
TbMsg tbMsg = ctx.transformMsg(msg, msg.getType(), entity.getEntityId(), msg.getMetaData(), msg.getData());
|
||||
container.setMsg(tbMsg);
|
||||
} else {
|
||||
container.setMsg(msg);
|
||||
}
|
||||
container.setResult(result);
|
||||
return container;
|
||||
});
|
||||
}
|
||||
|
||||
private ListenableFuture<Boolean> createIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer) {
|
||||
SearchDirectionIds sdId = processSingleSearchDirection(msg, entityContainer);
|
||||
return Futures.transformAsync(ctx.getRelationService().checkRelation(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), config.getRelationType(), RelationTypeGroup.COMMON),
|
||||
result -> {
|
||||
ListenableFuture<Boolean> checkRelationFuture = Futures.transformAsync(ctx.getRelationService().checkRelation(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), config.getRelationType(), RelationTypeGroup.COMMON), result -> {
|
||||
if (!result) {
|
||||
if (config.isRemoveCurrentRelations()) {
|
||||
return processDeleteRelations(ctx, processFindRelations(ctx, msg, sdId));
|
||||
}
|
||||
}
|
||||
return Futures.immediateFuture(true);
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
|
||||
return Futures.transformAsync(checkRelationFuture, result -> {
|
||||
if (!result) {
|
||||
return processCreateRelation(ctx, entityContainer, sdId);
|
||||
}
|
||||
return Futures.immediateFuture(true);
|
||||
});
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
private ListenableFuture<List<EntityRelation>> processFindRelations(TbContext ctx, TbMsg msg, SearchDirectionIds sdId) {
|
||||
if (sdId.isOrignatorDirectionFrom()) {
|
||||
return ctx.getRelationService().findByFromAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), config.getRelationType(), RelationTypeGroup.COMMON);
|
||||
} else {
|
||||
return ctx.getRelationService().findByToAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), config.getRelationType(), RelationTypeGroup.COMMON);
|
||||
}
|
||||
}
|
||||
|
||||
private ListenableFuture<Boolean> processDeleteRelations(TbContext ctx, ListenableFuture<List<EntityRelation>> listListenableFuture) {
|
||||
return Futures.transformAsync(listListenableFuture, entityRelations -> {
|
||||
if (!entityRelations.isEmpty()) {
|
||||
List<ListenableFuture<Boolean>> list = new ArrayList<>();
|
||||
for (EntityRelation relation : entityRelations) {
|
||||
list.add(ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), relation));
|
||||
}
|
||||
return Futures.transform(Futures.allAsList(list), result -> false);
|
||||
}
|
||||
return Futures.immediateFuture(false);
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
private ListenableFuture<Boolean> processCreateRelation(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
|
||||
@ -96,17 +139,17 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
|
||||
private ListenableFuture<Boolean> processView(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
|
||||
return Futures.transformAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), new EntityViewId(entityContainer.getEntityId().getId())), entityView -> {
|
||||
if (entityView != null) {
|
||||
return ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), new EntityRelation(sdId.getFromId(), sdId.getToId(), config.getRelationType(), RelationTypeGroup.COMMON));
|
||||
return processSave(ctx, sdId);
|
||||
} else {
|
||||
return Futures.immediateFuture(true);
|
||||
}
|
||||
});
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
private ListenableFuture<Boolean> processDevice(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
|
||||
return Futures.transformAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), new DeviceId(entityContainer.getEntityId().getId())), device -> {
|
||||
if (device != null) {
|
||||
return ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), new EntityRelation(sdId.getFromId(), sdId.getToId(), config.getRelationType(), RelationTypeGroup.COMMON));
|
||||
return processSave(ctx, sdId);
|
||||
} else {
|
||||
return Futures.immediateFuture(true);
|
||||
}
|
||||
@ -116,40 +159,45 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
|
||||
private ListenableFuture<Boolean> processAsset(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
|
||||
return Futures.transformAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), new AssetId(entityContainer.getEntityId().getId())), asset -> {
|
||||
if (asset != null) {
|
||||
return ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), new EntityRelation(sdId.getFromId(), sdId.getToId(), config.getRelationType(), RelationTypeGroup.COMMON));
|
||||
return processSave(ctx, sdId);
|
||||
} else {
|
||||
return Futures.immediateFuture(true);
|
||||
}
|
||||
});
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
private ListenableFuture<Boolean> processCustomer(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
|
||||
return Futures.transformAsync(ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), new CustomerId(entityContainer.getEntityId().getId())), customer -> {
|
||||
if (customer != null) {
|
||||
return ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), new EntityRelation(sdId.getFromId(), sdId.getToId(), config.getRelationType(), RelationTypeGroup.COMMON));
|
||||
return processSave(ctx, sdId);
|
||||
} else {
|
||||
return Futures.immediateFuture(true);
|
||||
}
|
||||
});
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
private ListenableFuture<Boolean> processDashboard(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
|
||||
return Futures.transformAsync(ctx.getDashboardService().findDashboardByIdAsync(ctx.getTenantId(), new DashboardId(entityContainer.getEntityId().getId())), dashboard -> {
|
||||
if (dashboard != null) {
|
||||
return ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), new EntityRelation(sdId.getFromId(), sdId.getToId(), config.getRelationType(), RelationTypeGroup.COMMON));
|
||||
return processSave(ctx, sdId);
|
||||
} else {
|
||||
return Futures.immediateFuture(true);
|
||||
}
|
||||
});
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
private ListenableFuture<Boolean> processTenant(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {
|
||||
return Futures.transformAsync(ctx.getTenantService().findTenantByIdAsync(ctx.getTenantId(), new TenantId(entityContainer.getEntityId().getId())), tenant -> {
|
||||
if (tenant != null) {
|
||||
return ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), new EntityRelation(sdId.getFromId(), sdId.getToId(), config.getRelationType(), RelationTypeGroup.COMMON));
|
||||
return processSave(ctx, sdId);
|
||||
} else {
|
||||
return Futures.immediateFuture(true);
|
||||
}
|
||||
});
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
private ListenableFuture<Boolean> processSave(TbContext ctx, SearchDirectionIds sdId) {
|
||||
return ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), new EntityRelation(sdId.getFromId(), sdId.getToId(), config.getRelationType(), RelationTypeGroup.COMMON));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -23,6 +23,8 @@ import org.thingsboard.server.common.data.relation.EntitySearchDirection;
|
||||
public class TbCreateRelationNodeConfiguration extends TbAbstractRelationActionNodeConfiguration implements NodeConfiguration<TbCreateRelationNodeConfiguration> {
|
||||
|
||||
private boolean createEntityIfNotExists;
|
||||
private boolean changeOriginatorToRelatedEntity;
|
||||
private boolean removeCurrentRelations;
|
||||
|
||||
@Override
|
||||
public TbCreateRelationNodeConfiguration defaultConfiguration() {
|
||||
@ -32,6 +34,8 @@ public class TbCreateRelationNodeConfiguration extends TbAbstractRelationActionN
|
||||
configuration.setEntityNamePattern("");
|
||||
configuration.setEntityCacheExpiration(300);
|
||||
configuration.setCreateEntityIfNotExists(false);
|
||||
configuration.setRemoveCurrentRelations(false);
|
||||
configuration.setChangeOriginatorToRelatedEntity(false);
|
||||
return configuration;
|
||||
}
|
||||
}
|
||||
|
||||
@ -42,7 +42,7 @@ import java.util.List;
|
||||
" if 'Delete single entity' is set to true, otherwise rule node will delete all relations to the originator of the message by type and direction.",
|
||||
nodeDetails = "If the relation(s) successfully deleted - Message send via <b>Success</b> chain, otherwise <b>Failure</b> chain will be used.",
|
||||
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
||||
configDirective ="tbActionNodeDeleteRelationConfig",
|
||||
configDirective = "tbActionNodeDeleteRelationConfig",
|
||||
icon = "remove_circle"
|
||||
)
|
||||
public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteRelationNodeConfiguration> {
|
||||
@ -58,26 +58,30 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ListenableFuture<Boolean> processEntityRelationAction(TbContext ctx, TbMsg msg) {
|
||||
if(config.isDeleteForSingleEntity()){
|
||||
protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg) {
|
||||
return getRelationContainerListenableFuture(ctx, msg);
|
||||
}
|
||||
|
||||
private ListenableFuture<RelationContainer> getRelationContainerListenableFuture(TbContext ctx, TbMsg msg) {
|
||||
if (config.isDeleteForSingleEntity()) {
|
||||
return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer));
|
||||
} else {
|
||||
return processList(ctx, msg);
|
||||
return Futures.transform(processList(ctx, msg), result -> new RelationContainer(msg, result));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ListenableFuture<Boolean> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer) {
|
||||
return processSingle(ctx, msg, entityContainer);
|
||||
protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer) {
|
||||
return Futures.transform(processSingle(ctx, msg, entityContainer), result -> new RelationContainer(msg, result));
|
||||
}
|
||||
|
||||
private ListenableFuture<Boolean> processList(TbContext ctx, TbMsg msg) {
|
||||
return Futures.transformAsync(processListSearchDirection(ctx, msg), entityRelations -> {
|
||||
if(entityRelations.isEmpty()){
|
||||
if (entityRelations.isEmpty()) {
|
||||
return Futures.immediateFuture(true);
|
||||
} else {
|
||||
List<ListenableFuture<Boolean>> listenableFutureList = new ArrayList<>();
|
||||
for (EntityRelation entityRelation: entityRelations) {
|
||||
for (EntityRelation entityRelation : entityRelations) {
|
||||
listenableFutureList.add(ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), entityRelation));
|
||||
}
|
||||
return Futures.transformAsync(Futures.allAsList(listenableFutureList), booleans -> {
|
||||
|
||||
File diff suppressed because one or more lines are too long
Loading…
x
Reference in New Issue
Block a user