Merge branch 'master' of git://github.com/thingsboard/thingsboard
This commit is contained in:
commit
c3f38d6e86
@ -514,6 +514,9 @@ public class ActorSystemContext {
|
||||
appActor.tell(tbActorMsg);
|
||||
}
|
||||
|
||||
public void tellWithHighPriority(TbActorMsg tbActorMsg) {
|
||||
appActor.tellWithHighPriority(tbActorMsg);
|
||||
}
|
||||
|
||||
public void schedulePeriodicMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs, long periodInMs) {
|
||||
log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs);
|
||||
|
||||
@ -82,12 +82,14 @@ public class AppActor extends ContextAwareActor {
|
||||
onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
|
||||
break;
|
||||
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
|
||||
onToDeviceActorMsg((TenantAwareMsg) msg, false);
|
||||
break;
|
||||
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
|
||||
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
|
||||
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
|
||||
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
|
||||
case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
|
||||
onToDeviceActorMsg((TenantAwareMsg) msg);
|
||||
onToDeviceActorMsg((TenantAwareMsg) msg, true);
|
||||
break;
|
||||
default:
|
||||
return false;
|
||||
@ -155,15 +157,20 @@ public class AppActor extends ContextAwareActor {
|
||||
}
|
||||
}
|
||||
if (target != null) {
|
||||
target.tell(msg);
|
||||
target.tellWithHighPriority(msg);
|
||||
} else {
|
||||
log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg);
|
||||
}
|
||||
}
|
||||
|
||||
private void onToDeviceActorMsg(TenantAwareMsg msg) {
|
||||
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
|
||||
if (!deletedTenants.contains(msg.getTenantId())) {
|
||||
getOrCreateTenantActor(msg.getTenantId()).tell(msg);
|
||||
TbActorRef tenantActor = getOrCreateTenantActor(msg.getTenantId());
|
||||
if (priority) {
|
||||
tenantActor.tellWithHighPriority(msg);
|
||||
} else {
|
||||
tenantActor.tell(msg);
|
||||
}
|
||||
} else {
|
||||
if (msg instanceof TransportToDeviceActorMsgWrapper) {
|
||||
((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
|
||||
|
||||
@ -128,7 +128,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
||||
} else {
|
||||
log.trace("[{}][{}] Updating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
|
||||
existing.setSelf(ruleNode);
|
||||
existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED));
|
||||
existing.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED));
|
||||
}
|
||||
}
|
||||
|
||||
@ -137,7 +137,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
||||
removedRules.forEach(ruleNodeId -> {
|
||||
log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId);
|
||||
RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
|
||||
removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED));
|
||||
removed.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED));
|
||||
});
|
||||
|
||||
initRoutes(ruleChain, ruleNodeList);
|
||||
@ -155,7 +155,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
||||
|
||||
@Override
|
||||
public void onPartitionChangeMsg(PartitionChangeMsg msg) {
|
||||
nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> actorRef.tell(msg));
|
||||
nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> actorRef.tellWithHighPriority(msg));
|
||||
}
|
||||
|
||||
private TbActorRef createRuleNodeActor(TbActorCtx ctx, RuleNode ruleNode) {
|
||||
|
||||
@ -83,7 +83,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
|
||||
public void destroy() {
|
||||
try {
|
||||
log.debug("[{}][{}][{}] Stopping processor.", tenantId, id, id.getEntityType());
|
||||
processor.stop(ctx);
|
||||
if (processor != null) {
|
||||
processor.stop(ctx);
|
||||
}
|
||||
logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
|
||||
|
||||
@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.actors.DefaultTbActorSystem;
|
||||
import org.thingsboard.server.actors.TbActorId;
|
||||
@ -83,10 +84,10 @@ public class DefaultActorService implements ActorService {
|
||||
TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
|
||||
system = new DefaultTbActorSystem(settings);
|
||||
|
||||
system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(appDispatcherSize));
|
||||
system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(tenantDispatcherSize));
|
||||
system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(deviceDispatcherSize));
|
||||
system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(ruleDispatcherSize));
|
||||
system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
|
||||
system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
|
||||
system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
|
||||
system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));
|
||||
|
||||
actorContext.setActorSystem(system);
|
||||
|
||||
@ -99,24 +100,28 @@ public class DefaultActorService implements ActorService {
|
||||
log.info("Actor system initialized.");
|
||||
}
|
||||
|
||||
private ExecutorService initDispatcherExecutor(int poolSize) {
|
||||
private ExecutorService initDispatcherExecutor(String dispatcherName, int poolSize) {
|
||||
if (poolSize == 0) {
|
||||
int cores = Runtime.getRuntime().availableProcessors();
|
||||
poolSize = Math.max(1, cores / 2);
|
||||
}
|
||||
return Executors.newWorkStealingPool(poolSize);
|
||||
if (poolSize == 1) {
|
||||
return Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(dispatcherName));
|
||||
} else {
|
||||
return Executors.newWorkStealingPool(poolSize);
|
||||
}
|
||||
}
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
|
||||
log.info("Received application ready event. Sending application init message to actor system");
|
||||
appActor.tell(new AppInitMsg());
|
||||
appActor.tellWithHighPriority(new AppInitMsg());
|
||||
}
|
||||
|
||||
@EventListener(PartitionChangeEvent.class)
|
||||
public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
|
||||
log.info("Received partition change event.");
|
||||
this.appActor.tell(new PartitionChangeMsg(partitionChangeEvent.getServiceQueueKey(), partitionChangeEvent.getPartitions()));
|
||||
this.appActor.tellWithHighPriority(new PartitionChangeMsg(partitionChangeEvent.getServiceQueueKey(), partitionChangeEvent.getPartitions()));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -135,12 +135,14 @@ public class TenantActor extends RuleChainManagerActor {
|
||||
onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
|
||||
break;
|
||||
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
|
||||
onToDeviceActorMsg((DeviceAwareMsg) msg, false);
|
||||
break;
|
||||
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
|
||||
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
|
||||
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
|
||||
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
|
||||
case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
|
||||
onToDeviceActorMsg((DeviceAwareMsg) msg);
|
||||
onToDeviceActorMsg((DeviceAwareMsg) msg, true);
|
||||
break;
|
||||
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
|
||||
onRuleChainMsg((RuleChainAwareMsg) msg);
|
||||
@ -183,11 +185,16 @@ public class TenantActor extends RuleChainManagerActor {
|
||||
getOrCreateActor(msg.getRuleChainId()).tell(msg);
|
||||
}
|
||||
|
||||
private void onToDeviceActorMsg(DeviceAwareMsg msg) {
|
||||
private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
|
||||
if (!isCore) {
|
||||
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
|
||||
}
|
||||
getOrCreateDeviceActor(msg.getDeviceId()).tell(msg);
|
||||
TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
|
||||
if (priority) {
|
||||
deviceActor.tellWithHighPriority(msg);
|
||||
} else {
|
||||
deviceActor.tell(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
|
||||
@ -199,7 +206,7 @@ public class TenantActor extends RuleChainManagerActor {
|
||||
findRuleChainById(tenantId, new RuleChainId(msg.getEntityId().getId()));
|
||||
visit(ruleChain, target);
|
||||
}
|
||||
target.tell(msg);
|
||||
target.tellWithHighPriority(msg);
|
||||
} else {
|
||||
log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg);
|
||||
}
|
||||
|
||||
@ -207,7 +207,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray());
|
||||
if (actorMsg.isPresent()) {
|
||||
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
|
||||
actorContext.tell(actorMsg.get());
|
||||
actorContext.tellWithHighPriority(actorMsg.get());
|
||||
}
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
@ -230,7 +230,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
Optional<TbActorMsg> actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray());
|
||||
if (actorMsg.isPresent()) {
|
||||
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
|
||||
actorContext.tell(actorMsg.get());
|
||||
actorContext.tellWithHighPriority(actorMsg.get());
|
||||
}
|
||||
callback.onSuccess();
|
||||
} else if (nfMsg.hasFromDeviceRpcResponse()) {
|
||||
|
||||
@ -121,7 +121,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
|
||||
log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
|
||||
UUID requestId = request.getId();
|
||||
localToDeviceRpcRequests.put(requestId, rpcMsg);
|
||||
actorContext.tell(rpcMsg);
|
||||
actorContext.tellWithHighPriority(rpcMsg);
|
||||
scheduleToDeviceTimeout(request, requestId);
|
||||
}
|
||||
|
||||
@ -175,7 +175,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
|
||||
}
|
||||
|
||||
private void scheduleToRuleEngineTimeout(ToDeviceRpcRequest request, UUID requestId) {
|
||||
long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
|
||||
long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1);
|
||||
log.trace("[{}] processing to rule engine request.", requestId);
|
||||
scheduler.schedule(() -> {
|
||||
log.trace("[{}] timeout for processing to rule engine request.", requestId);
|
||||
@ -187,7 +187,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
|
||||
}
|
||||
|
||||
private void scheduleToDeviceTimeout(ToDeviceRpcRequest request, UUID requestId) {
|
||||
long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
|
||||
long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1);
|
||||
log.trace("[{}] processing to device request.", requestId);
|
||||
scheduler.schedule(() -> {
|
||||
log.trace("[{}] timeout for to device request.", requestId);
|
||||
|
||||
@ -164,7 +164,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
|
||||
}
|
||||
|
||||
private void scheduleTimeout(ToDeviceRpcRequest request, UUID requestId) {
|
||||
long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis());
|
||||
long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1);
|
||||
log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId);
|
||||
scheduler.schedule(() -> {
|
||||
log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId);
|
||||
|
||||
@ -26,7 +26,9 @@ import java.util.Arrays;
|
||||
|
||||
@RunWith(ClasspathSuite.class)
|
||||
@ClasspathSuite.ClassnameFilters({
|
||||
"org.thingsboard.server.mqtt.rpc.sql.*Test", "org.thingsboard.server.mqtt.telemetry.sql.*Test"})
|
||||
"org.thingsboard.server.mqtt.rpc.sql.*Test",
|
||||
"org.thingsboard.server.mqtt.telemetry.sql.*Test"
|
||||
})
|
||||
public class MqttSqlTestSuite {
|
||||
|
||||
@ClassRule
|
||||
|
||||
@ -136,7 +136,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
|
||||
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"24\",\"value\": 1},\"timeout\": 6000}";
|
||||
String deviceId = savedDevice.getId().getId().toString();
|
||||
|
||||
doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(),
|
||||
doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().is(409),
|
||||
asyncContextTimeoutToUseRpcPlugin);
|
||||
}
|
||||
|
||||
@ -193,7 +193,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
|
||||
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"27\",\"value\": 1},\"timeout\": 6000}";
|
||||
String deviceId = savedDevice.getId().getId().toString();
|
||||
|
||||
doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(),
|
||||
doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().is(409),
|
||||
asyncContextTimeoutToUseRpcPlugin);
|
||||
}
|
||||
|
||||
|
||||
@ -132,19 +132,28 @@ public class DefaultTbActorSystem implements TbActorSystem {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tell(TbActorRef target, TbActorMsg actorMsg) {
|
||||
target.tell(actorMsg);
|
||||
public void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg) {
|
||||
tell(target, actorMsg, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tell(TbActorId target, TbActorMsg actorMsg) {
|
||||
tell(target, actorMsg, false);
|
||||
}
|
||||
|
||||
private void tell(TbActorId target, TbActorMsg actorMsg, boolean highPriority) {
|
||||
TbActorMailbox mailbox = actors.get(target);
|
||||
if (mailbox == null) {
|
||||
throw new TbActorNotRegisteredException(target, "Actor with id [" + target + "] is not registered!");
|
||||
}
|
||||
mailbox.enqueue(actorMsg);
|
||||
if (highPriority) {
|
||||
mailbox.tellWithHighPriority(actorMsg);
|
||||
} else {
|
||||
mailbox.tell(actorMsg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void broadcastToChildren(TbActorId parent, TbActorMsg msg) {
|
||||
broadcastToChildren(parent, id -> true, msg);
|
||||
|
||||
@ -29,6 +29,9 @@ import java.util.function.Supplier;
|
||||
@Slf4j
|
||||
@Data
|
||||
public final class TbActorMailbox implements TbActorCtx {
|
||||
private static final boolean HIGH_PRIORITY = true;
|
||||
private static final boolean NORMAL_PRIORITY = false;
|
||||
|
||||
private static final boolean FREE = false;
|
||||
private static final boolean BUSY = true;
|
||||
|
||||
@ -41,7 +44,8 @@ public final class TbActorMailbox implements TbActorCtx {
|
||||
private final TbActorRef parentRef;
|
||||
private final TbActor actor;
|
||||
private final Dispatcher dispatcher;
|
||||
private final ConcurrentLinkedQueue<TbActorMsg> msgs = new ConcurrentLinkedQueue<>();
|
||||
private final ConcurrentLinkedQueue<TbActorMsg> highPriorityMsgs = new ConcurrentLinkedQueue<>();
|
||||
private final ConcurrentLinkedQueue<TbActorMsg> normalPriorityMsgs = new ConcurrentLinkedQueue<>();
|
||||
private final AtomicBoolean busy = new AtomicBoolean(FREE);
|
||||
private final AtomicBoolean ready = new AtomicBoolean(NOT_READY);
|
||||
private final AtomicBoolean destroyInProgress = new AtomicBoolean();
|
||||
@ -50,7 +54,6 @@ public final class TbActorMailbox implements TbActorCtx {
|
||||
dispatcher.getExecutor().execute(() -> tryInit(1));
|
||||
}
|
||||
|
||||
|
||||
private void tryInit(int attempt) {
|
||||
try {
|
||||
log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt);
|
||||
@ -78,23 +81,38 @@ public final class TbActorMailbox implements TbActorCtx {
|
||||
}
|
||||
}
|
||||
|
||||
public void enqueue(TbActorMsg msg) {
|
||||
msgs.add(msg);
|
||||
private void enqueue(TbActorMsg msg, boolean highPriority) {
|
||||
if (highPriority) {
|
||||
highPriorityMsgs.add(msg);
|
||||
} else {
|
||||
normalPriorityMsgs.add(msg);
|
||||
}
|
||||
tryProcessQueue(true);
|
||||
}
|
||||
|
||||
private void tryProcessQueue(boolean newMsg) {
|
||||
if (ready.get() == READY && (newMsg || !msgs.isEmpty()) && busy.compareAndSet(FREE, BUSY)) {
|
||||
dispatcher.getExecutor().execute(this::processMailbox);
|
||||
if (ready.get() == READY) {
|
||||
if (newMsg || !highPriorityMsgs.isEmpty() || !normalPriorityMsgs.isEmpty()) {
|
||||
if (busy.compareAndSet(FREE, BUSY)) {
|
||||
dispatcher.getExecutor().execute(this::processMailbox);
|
||||
} else {
|
||||
log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg);
|
||||
}
|
||||
} else {
|
||||
log.trace("[{}] MessageBox is empty, new msg: {}", selfId, newMsg);
|
||||
}
|
||||
} else {
|
||||
log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg);
|
||||
log.trace("[{}] MessageBox is not ready, new msg: {}", selfId, newMsg);
|
||||
}
|
||||
}
|
||||
|
||||
private void processMailbox() {
|
||||
boolean noMoreElements = false;
|
||||
for (int i = 0; i < settings.getActorThroughput(); i++) {
|
||||
TbActorMsg msg = msgs.poll();
|
||||
TbActorMsg msg = highPriorityMsgs.poll();
|
||||
if (msg == null) {
|
||||
msg = normalPriorityMsgs.poll();
|
||||
}
|
||||
if (msg != null) {
|
||||
try {
|
||||
log.debug("[{}] Going to process message: {}", selfId, msg);
|
||||
@ -178,6 +196,12 @@ public final class TbActorMailbox implements TbActorCtx {
|
||||
|
||||
@Override
|
||||
public void tell(TbActorMsg actorMsg) {
|
||||
enqueue(actorMsg);
|
||||
enqueue(actorMsg, NORMAL_PRIORITY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tellWithHighPriority(TbActorMsg actorMsg) {
|
||||
enqueue(actorMsg, HIGH_PRIORITY);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -23,4 +23,6 @@ public interface TbActorRef {
|
||||
|
||||
void tell(TbActorMsg actorMsg);
|
||||
|
||||
void tellWithHighPriority(TbActorMsg actorMsg);
|
||||
|
||||
}
|
||||
|
||||
@ -36,10 +36,10 @@ public interface TbActorSystem {
|
||||
|
||||
TbActorRef createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent);
|
||||
|
||||
void tell(TbActorRef target, TbActorMsg actorMsg);
|
||||
|
||||
void tell(TbActorId target, TbActorMsg actorMsg);
|
||||
|
||||
void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg);
|
||||
|
||||
void stop(TbActorRef actorRef);
|
||||
|
||||
void stop(TbActorId actorId);
|
||||
|
||||
@ -40,19 +40,19 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
public class ActorSystemTest {
|
||||
|
||||
public static final String ROOT_DISPATCHER = "root-dispatcher";
|
||||
private static final int _1M = 1024 * 1024;
|
||||
private static final int _100K = 100 * 1024;
|
||||
|
||||
private TbActorSystem actorSystem;
|
||||
private ExecutorService submitPool;
|
||||
private volatile TbActorSystem actorSystem;
|
||||
private volatile ExecutorService submitPool;
|
||||
private int parallelism;
|
||||
|
||||
@Before
|
||||
public void initActorSystem() {
|
||||
int cores = Runtime.getRuntime().availableProcessors();
|
||||
int parallelism = Math.max(1, cores / 2);
|
||||
parallelism = Math.max(2, cores / 2);
|
||||
TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42);
|
||||
actorSystem = new DefaultTbActorSystem(settings);
|
||||
submitPool = Executors.newWorkStealingPool(parallelism);
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
}
|
||||
|
||||
@After
|
||||
@ -62,22 +62,44 @@ public class ActorSystemTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test10actorsAnd1MMessages() throws InterruptedException {
|
||||
testActorsAndMessages(10, _1M);
|
||||
public void test1actorsAnd100KMessages() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
testActorsAndMessages(1, _100K, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test1MActorsAnd10Messages() throws InterruptedException {
|
||||
testActorsAndMessages(_1M, 10);
|
||||
public void test10actorsAnd100KMessages() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
testActorsAndMessages(10, _100K, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test100KActorsAnd1Messages5timesSingleThread() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newSingleThreadExecutor());
|
||||
testActorsAndMessages(_100K, 1, 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test100KActorsAnd1Messages5times() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
testActorsAndMessages(_100K, 1, 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test100KActorsAnd10Messages() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
testActorsAndMessages(_100K, 10, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test1KActorsAnd1KMessages() throws InterruptedException {
|
||||
testActorsAndMessages(1000, 1000);
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
testActorsAndMessages(1000, 1000, 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoMessagesAfterDestroy() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
ActorTestCtx testCtx1 = getActorTestCtx(1);
|
||||
ActorTestCtx testCtx2 = getActorTestCtx(1);
|
||||
|
||||
@ -86,16 +108,17 @@ public class ActorSystemTest {
|
||||
TbActorRef actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator(
|
||||
new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx2));
|
||||
|
||||
actorSystem.tell(actorId1, new IntTbActorMsg(42));
|
||||
actorSystem.tell(actorId2, new IntTbActorMsg(42));
|
||||
actorId1.tell(new IntTbActorMsg(42));
|
||||
actorId2.tell(new IntTbActorMsg(42));
|
||||
actorSystem.stop(actorId1);
|
||||
|
||||
Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS));
|
||||
Assert.assertFalse(testCtx1.getLatch().await(2, TimeUnit.SECONDS));
|
||||
Assert.assertFalse(testCtx1.getLatch().await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOneActorCreated() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
ActorTestCtx testCtx1 = getActorTestCtx(1);
|
||||
ActorTestCtx testCtx2 = getActorTestCtx(1);
|
||||
TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
|
||||
@ -105,15 +128,16 @@ public class ActorSystemTest {
|
||||
Thread.sleep(1000);
|
||||
actorSystem.tell(actorId, new IntTbActorMsg(42));
|
||||
|
||||
Assert.assertTrue(testCtx1.getLatch().await(3, TimeUnit.SECONDS));
|
||||
Assert.assertFalse(testCtx2.getLatch().await(3, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(testCtx1.getLatch().await(1, TimeUnit.SECONDS));
|
||||
Assert.assertFalse(testCtx2.getLatch().await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testActorCreatorCalledOnce() throws InterruptedException {
|
||||
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
|
||||
ActorTestCtx testCtx = getActorTestCtx(1);
|
||||
TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
|
||||
for(int i =0; i < 1000; i++) {
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx)));
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
@ -125,7 +149,7 @@ public class ActorSystemTest {
|
||||
}
|
||||
|
||||
|
||||
public void testActorsAndMessages(int actorsCount, int msgNumber) throws InterruptedException {
|
||||
public void testActorsAndMessages(int actorsCount, int msgNumber, int times) throws InterruptedException {
|
||||
Random random = new Random();
|
||||
int[] randomIntegers = new int[msgNumber];
|
||||
long sumTmp = 0;
|
||||
@ -141,32 +165,35 @@ public class ActorSystemTest {
|
||||
List<TbActorRef> actorRefs = new ArrayList<>();
|
||||
for (int actorIdx = 0; actorIdx < actorsCount; actorIdx++) {
|
||||
ActorTestCtx testCtx = getActorTestCtx(msgNumber);
|
||||
|
||||
actorRefs.add(actorSystem.createRootActor(ROOT_DISPATCHER, new TestRootActor.TestRootActorCreator(
|
||||
new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx)));
|
||||
testCtxes.add(testCtx);
|
||||
}
|
||||
|
||||
long start = System.nanoTime();
|
||||
|
||||
for (int i = 0; i < msgNumber; i++) {
|
||||
int tmp = randomIntegers[i];
|
||||
submitPool.execute(() -> actorRefs.forEach(actorId -> actorSystem.tell(actorId, new IntTbActorMsg(tmp))));
|
||||
}
|
||||
log.info("Submitted all messages");
|
||||
|
||||
testCtxes.forEach(ctx -> {
|
||||
try {
|
||||
Assert.assertTrue(ctx.getLatch().await(1, TimeUnit.MINUTES));
|
||||
Assert.assertEquals(expected, ctx.getActual().get());
|
||||
Assert.assertEquals(msgNumber, ctx.getInvocationCount().get());
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
for (int t = 0; t < times; t++) {
|
||||
long start = System.nanoTime();
|
||||
for (int i = 0; i < msgNumber; i++) {
|
||||
int tmp = randomIntegers[i];
|
||||
submitPool.execute(() -> actorRefs.forEach(actorId -> actorId.tell(new IntTbActorMsg(tmp))));
|
||||
}
|
||||
});
|
||||
|
||||
long duration = System.nanoTime() - start;
|
||||
log.info("Time spend: {}ns ({} ms)", duration, TimeUnit.NANOSECONDS.toMillis(duration));
|
||||
log.info("Submitted all messages");
|
||||
testCtxes.forEach(ctx -> {
|
||||
try {
|
||||
boolean success = ctx.getLatch().await(1, TimeUnit.MINUTES);
|
||||
if (!success) {
|
||||
log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get());
|
||||
}
|
||||
Assert.assertTrue(success);
|
||||
Assert.assertEquals(expected, ctx.getActual().get());
|
||||
Assert.assertEquals(msgNumber, ctx.getInvocationCount().get());
|
||||
ctx.clear();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
long duration = System.nanoTime() - start;
|
||||
log.info("Time spend: {}ns ({} ms)", duration, TimeUnit.NANOSECONDS.toMillis(duration));
|
||||
}
|
||||
}
|
||||
|
||||
private ActorTestCtx getActorTestCtx(int i) {
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.actors;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
@ -22,10 +23,17 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public class ActorTestCtx {
|
||||
|
||||
private final CountDownLatch latch;
|
||||
private volatile CountDownLatch latch;
|
||||
private final AtomicInteger invocationCount;
|
||||
private final int expectedInvocationCount;
|
||||
private final AtomicLong actual;
|
||||
|
||||
public void clear() {
|
||||
latch = new CountDownLatch(1);
|
||||
invocationCount.set(0);
|
||||
actual.set(0L);
|
||||
}
|
||||
}
|
||||
|
||||
@ -51,6 +51,8 @@ public class TestRootActor extends AbstractTbActor {
|
||||
if (count == testCtx.getExpectedInvocationCount()) {
|
||||
testCtx.getActual().set(sum);
|
||||
testCtx.getInvocationCount().addAndGet(count);
|
||||
sum = 0;
|
||||
count = 0;
|
||||
testCtx.getLatch().countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@ -95,8 +95,20 @@ public class TbKafkaNode implements TbNode {
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData());
|
||||
try {
|
||||
ctx.getExternalCallExecutor().executeAsync(() -> {
|
||||
publish(ctx, msg, topic);
|
||||
return null;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
ctx.tellFailure(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void publish(TbContext ctx, TbMsg msg, String topic) {
|
||||
try {
|
||||
if (!addMetadataKeyValuesAsKafkaHeaders) {
|
||||
//TODO: external system executor
|
||||
producer.send(new ProducerRecord<>(topic, msg.getData()),
|
||||
(metadata, e) -> processRecord(ctx, msg, metadata, e));
|
||||
} else {
|
||||
@ -105,9 +117,8 @@ public class TbKafkaNode implements TbNode {
|
||||
producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers),
|
||||
(metadata, e) -> processRecord(ctx, msg, metadata, e));
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
ctx.tellFailure(msg, e);
|
||||
log.debug("[{}] Failed to process message: {}", ctx.getSelfId(), msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -73,6 +73,8 @@ import { DialogService } from '@core/services/dialog.service';
|
||||
import { CustomDialogService } from '@home/components/widget/dialog/custom-dialog.service';
|
||||
import { DatePipe } from '@angular/common';
|
||||
import { TranslateService } from '@ngx-translate/core';
|
||||
import { PageLink } from '@shared/models/page/page-link';
|
||||
import { SortOrder } from '@shared/models/page/sort-order';
|
||||
|
||||
export interface IWidgetAction {
|
||||
name: string;
|
||||
@ -297,6 +299,10 @@ export class WidgetContext {
|
||||
this.widgetTitle = undefined;
|
||||
this.widgetActions = undefined;
|
||||
}
|
||||
|
||||
pageLink(pageSize: number, page: number = 0, textSearch: string = null, sortOrder: SortOrder = null): PageLink {
|
||||
return new PageLink(pageSize, page, textSearch, sortOrder);
|
||||
};
|
||||
}
|
||||
|
||||
export interface IDynamicWidgetComponent {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user