Changed direct executors to db executors

This commit is contained in:
Volodymyr Babak 2020-05-06 12:17:41 +03:00
parent 7a633ed60a
commit d67ab52309
3 changed files with 10 additions and 13 deletions

View File

@ -20,7 +20,6 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@ -88,7 +87,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
}
protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg, String relationType) {
return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer, relationType), MoreExecutors.directExecutor());
return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer, relationType), ctx.getDbCallbackExecutor());
}
protected abstract boolean createEntityIfNotExists();

View File

@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.action;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
@ -80,7 +79,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
}
container.setResult(result);
return container;
}, MoreExecutors.directExecutor());
}, ctx.getDbCallbackExecutor());
}
private ListenableFuture<Boolean> createIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) {
@ -118,7 +117,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
for (EntityRelation relation : entityRelations) {
list.add(ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), relation));
}
return Futures.transform(Futures.allAsList(list), result -> false, MoreExecutors.directExecutor());
return Futures.transform(Futures.allAsList(list), result -> false, ctx.getDbCallbackExecutor());
}
return Futures.immediateFuture(false);
}, ctx.getDbCallbackExecutor());
@ -159,7 +158,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
} else {
return Futures.immediateFuture(true);
}
}, MoreExecutors.directExecutor());
}, ctx.getDbCallbackExecutor());
}
private ListenableFuture<Boolean> processAsset(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {

View File

@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.action;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
@ -65,14 +64,14 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
@Override
protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) {
return Futures.transform(processSingle(ctx, msg, entityContainer, relationType), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor());
return Futures.transform(processSingle(ctx, msg, entityContainer, relationType), result -> new RelationContainer(msg, result), ctx.getDbCallbackExecutor());
}
private ListenableFuture<RelationContainer> getRelationContainerListenableFuture(TbContext ctx, TbMsg msg, String relationType) {
if (config.isDeleteForSingleEntity()) {
return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer, relationType), MoreExecutors.directExecutor());
return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer, relationType), ctx.getDbCallbackExecutor());
} else {
return Futures.transform(processList(ctx, msg), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor());
return Futures.transform(processList(ctx, msg), result -> new RelationContainer(msg, result), ctx.getDbCallbackExecutor());
}
}
@ -92,9 +91,9 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
}
}
return Futures.immediateFuture(true);
}, MoreExecutors.directExecutor());
}, ctx.getDbCallbackExecutor());
}
}, MoreExecutors.directExecutor());
}, ctx.getDbCallbackExecutor());
}
private ListenableFuture<Boolean> processSingle(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) {
@ -105,7 +104,7 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
return processSingleDeleteRelation(ctx, sdId, relationType);
}
return Futures.immediateFuture(true);
}, MoreExecutors.directExecutor());
}, ctx.getDbCallbackExecutor());
}
private ListenableFuture<Boolean> processSingleDeleteRelation(TbContext ctx, SearchDirectionIds sdId, String relationType) {