Minor improvements
This commit is contained in:
parent
e1f914e2fa
commit
b1f87206f3
@ -192,6 +192,9 @@ class DefaultTbContext implements TbContext {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void ack(TbMsg tbMsg) {
|
public void ack(TbMsg tbMsg) {
|
||||||
|
if (nodeCtx.getSelf().isDebugMode()) {
|
||||||
|
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "ACK", null);
|
||||||
|
}
|
||||||
tbMsg.getCallback().onSuccess();
|
tbMsg.getCallback().onSuccess();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.id.RuleNodeId;
|
|||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.msg.TbActorMsg;
|
import org.thingsboard.server.common.msg.TbActorMsg;
|
||||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||||
|
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
|
||||||
|
|
||||||
public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
|
public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
|
||||||
|
|
||||||
@ -54,6 +55,7 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
|
|||||||
onStatsPersistTick(id);
|
onStatsPersistTick(id);
|
||||||
break;
|
break;
|
||||||
case PARTITION_CHANGE_MSG:
|
case PARTITION_CHANGE_MSG:
|
||||||
|
onClusterEventMsg((PartitionChangeMsg) msg);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@ -129,10 +129,10 @@ public class DefaultDeviceStateService implements DeviceStateService {
|
|||||||
private volatile boolean clusterUpdatePending = false;
|
private volatile boolean clusterUpdatePending = false;
|
||||||
|
|
||||||
private ListeningScheduledExecutorService queueExecutor;
|
private ListeningScheduledExecutorService queueExecutor;
|
||||||
private ConcurrentMap<TopicPartitionInfo, Set<DeviceId>> partitionedDevices = new ConcurrentHashMap<>();
|
private final ConcurrentMap<TopicPartitionInfo, Set<DeviceId>> partitionedDevices = new ConcurrentHashMap<>();
|
||||||
private ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
|
private final ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
|
||||||
private ConcurrentMap<DeviceId, Long> deviceLastReportedActivity = new ConcurrentHashMap<>();
|
private final ConcurrentMap<DeviceId, Long> deviceLastReportedActivity = new ConcurrentHashMap<>();
|
||||||
private ConcurrentMap<DeviceId, Long> deviceLastSavedActivity = new ConcurrentHashMap<>();
|
private final ConcurrentMap<DeviceId, Long> deviceLastSavedActivity = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService,
|
public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService,
|
||||||
AttributesService attributesService, TimeseriesService tsService,
|
AttributesService attributesService, TimeseriesService tsService,
|
||||||
|
|||||||
@ -41,6 +41,6 @@ public interface DeviceStateService extends ApplicationListener<PartitionChangeE
|
|||||||
|
|
||||||
void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
|
void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
|
||||||
|
|
||||||
void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto serverAddress, TbCallback bytes);
|
void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback bytes);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -588,7 +588,7 @@ public class DefaultTransportService implements TransportService {
|
|||||||
@Override
|
@Override
|
||||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||||
if (msgCount.decrementAndGet() <= 0) {
|
if (msgCount.decrementAndGet() <= 0) {
|
||||||
callback.onSuccess(null);
|
DefaultTransportService.this.transportCallbackExecutor.submit(() -> callback.onSuccess(null));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user