Implementation

This commit is contained in:
Andrii Shvaika 2020-03-17 18:38:09 +02:00
parent 7de485f453
commit 94eb213716
17 changed files with 222 additions and 211 deletions

View File

@ -361,7 +361,6 @@ public class ActorSystemContext {
event.setTenantId(tenantId); event.setTenantId(tenantId);
event.setEntityId(entityId); event.setEntityId(entityId);
event.setType(DataConstants.ERROR); event.setType(DataConstants.ERROR);
//TODO 2.5
event.setBody(toBodyJson(serviceInfoProvider.getServiceInfo().getServiceId(), method, toString(e))); event.setBody(toBodyJson(serviceInfoProvider.getServiceInfo().getServiceId(), method, toString(e)));
persistEvent(event); persistEvent(event);
} }
@ -371,7 +370,6 @@ public class ActorSystemContext {
event.setTenantId(tenantId); event.setTenantId(tenantId);
event.setEntityId(entityId); event.setEntityId(entityId);
event.setType(DataConstants.LC_EVENT); event.setType(DataConstants.LC_EVENT);
//TODO 2.5
event.setBody(toBodyJson(serviceInfoProvider.getServiceInfo().getServiceId(), lcEvent, Optional.ofNullable(e))); event.setBody(toBodyJson(serviceInfoProvider.getServiceInfo().getServiceId(), lcEvent, Optional.ofNullable(e)));
persistEvent(event); persistEvent(event);
} }
@ -403,9 +401,7 @@ public class ActorSystemContext {
public String getServerAddress() { public String getServerAddress() {
//TODO 2.5 return serviceInfoProvider.getServiceId();
// return discoveryService.getCurrentServer().getServerAddress().toString();
return null;
} }
public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) { public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) {

View File

@ -65,6 +65,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseM
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto; import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.service.queue.TbMsgCallback;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg; import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
@ -225,6 +226,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) { void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) {
boolean reportDeviceActivity = false; boolean reportDeviceActivity = false;
TransportToDeviceActorMsg msg = wrapper.getMsg(); TransportToDeviceActorMsg msg = wrapper.getMsg();
TbMsgCallback callback = wrapper.getCallback();
if (msg.hasSessionEvent()) { if (msg.hasSessionEvent()) {
processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent()); processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
} }
@ -258,6 +260,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
if (reportDeviceActivity) { if (reportDeviceActivity) {
reportLogicalDeviceActivity(); reportLogicalDeviceActivity();
} }
callback.onSuccess();
} }
//TODO 2.5 move this as a notification to the queue; //TODO 2.5 move this as a notification to the queue;

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -58,13 +58,12 @@ public class StatsActor extends ContextAwareActor {
event.setEntityId(msg.getEntityId()); event.setEntityId(msg.getEntityId());
event.setTenantId(msg.getTenantId()); event.setTenantId(msg.getTenantId());
event.setType(DataConstants.STATS); event.setType(DataConstants.STATS);
//TODO 2.5 event.setBody(toBodyJson(systemContext.getServiceInfoProvider().getServiceId(), msg.getMessagesProcessed(), msg.getErrorsOccurred()));
// event.setBody(toBodyJson(systemContext.getDiscoveryService().getCurrentServer().getServerAddress(), msg.getMessagesProcessed(), msg.getErrorsOccurred()));
systemContext.getEventService().save(event); systemContext.getEventService().save(event);
} }
private JsonNode toBodyJson(ServerAddress server, long messagesProcessed, long errorsOccurred) { private JsonNode toBodyJson(String serviceId, long messagesProcessed, long errorsOccurred) {
return mapper.createObjectNode().put("server", server.toString()).put("messagesProcessed", messagesProcessed).put("errorsOccurred", errorsOccurred); return mapper.createObjectNode().put("server", serviceId).put("messagesProcessed", messagesProcessed).put("errorsOccurred", errorsOccurred);
} }
public static class ActorCreator extends ContextBasedCreator<StatsActor> { public static class ActorCreator extends ContextBasedCreator<StatsActor> {

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -29,9 +29,11 @@ import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.discovery.PartitionChangeEvent; import org.thingsboard.server.discovery.PartitionChangeEvent;
import org.thingsboard.server.discovery.ServiceType; import org.thingsboard.server.discovery.ServiceType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.provider.TbCoreQueueProvider; import org.thingsboard.server.provider.TbCoreQueueProvider;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -59,14 +61,16 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService {
private boolean statsEnabled; private boolean statsEnabled;
private final ActorSystemContext actorContext; private final ActorSystemContext actorContext;
private final DeviceStateService stateService;
private final TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> consumer; private final TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> consumer;
private final TbCoreConsumerStats stats = new TbCoreConsumerStats(); private final TbCoreConsumerStats stats = new TbCoreConsumerStats();
private volatile ExecutorService mainConsumerExecutor; private volatile ExecutorService mainConsumerExecutor;
private volatile boolean stopped = false; private volatile boolean stopped = false;
public DefaultTbCoreConsumerService(TbCoreQueueProvider tbCoreQueueProvider, ActorSystemContext actorContext) { public DefaultTbCoreConsumerService(TbCoreQueueProvider tbCoreQueueProvider, ActorSystemContext actorContext, DeviceStateService stateService) {
this.consumer = tbCoreQueueProvider.getToCoreMsgConsumer(); this.consumer = tbCoreQueueProvider.getToCoreMsgConsumer();
this.actorContext = actorContext; this.actorContext = actorContext;
this.stateService = stateService;
} }
@PostConstruct @PostConstruct
@ -88,7 +92,7 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService {
while (!stopped) { while (!stopped) {
try { try {
List<TbProtoQueueMsg<ToCoreMsg>> msgs = consumer.poll(pollDuration); List<TbProtoQueueMsg<ToCoreMsg>> msgs = consumer.poll(pollDuration);
if(msgs.isEmpty()){ if (msgs.isEmpty()) {
continue; continue;
} }
ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> ackMap = msgs.stream().collect( ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> ackMap = msgs.stream().collect(
@ -98,11 +102,12 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService {
TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, ackMap); TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, ackMap);
try { try {
ToCoreMsg toCoreMsg = msg.getValue(); ToCoreMsg toCoreMsg = msg.getValue();
log.trace("Forwarding message to rule engine {}", toCoreMsg);
if (toCoreMsg.hasToDeviceActorMsg()) { if (toCoreMsg.hasToDeviceActorMsg()) {
log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback); forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
} else { } else if (toCoreMsg.hasDeviceStateServiceMsg()) {
callback.onSuccess(); log.trace("[{}] Forwarding message to state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback);
} }
} catch (Throwable e) { } catch (Throwable e) {
callback.onFailure(e); callback.onFailure(e);
@ -124,6 +129,13 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService {
}); });
} }
private void forwardToStateService(TransportProtos.DeviceStateServiceMsgProto deviceStateServiceMsg, TbMsgCallback callback) {
if (statsEnabled) {
stats.log(deviceStateServiceMsg);
}
stateService.onQueueMsg(deviceStateServiceMsg, callback);
}
private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbMsgCallback callback) { private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbMsgCallback callback) {
if (statsEnabled) { if (statsEnabled) {
stats.log(toDeviceActorMsg); stats.log(toDeviceActorMsg);

View File

@ -36,6 +36,7 @@ public class MsgPackCallback<T extends com.google.protobuf.GeneratedMessageV3> i
@Override @Override
public void onSuccess() { public void onSuccess() {
log.trace("[{}] ON SUCCESS", id);
if (ackMap.remove(id) != null && ackMap.isEmpty()) { if (ackMap.remove(id) != null && ackMap.isEmpty()) {
processingTimeoutLatch.countDown(); processingTimeoutLatch.countDown();
} }
@ -43,6 +44,7 @@ public class MsgPackCallback<T extends com.google.protobuf.GeneratedMessageV3> i
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.trace("[{}] ON FAILURE", id);
TbProtoQueueMsg<T> message = ackMap.remove(id); TbProtoQueueMsg<T> message = ackMap.remove(id);
log.warn("Failed to process message: {}", message.getValue(), t); log.warn("Failed to process message: {}", message.getValue(), t);
if (ackMap.isEmpty()) { if (ackMap.isEmpty()) {

View File

@ -57,6 +57,10 @@ public class TbCoreConsumerStats {
} }
} }
public void log(TransportProtos.DeviceStateServiceMsgProto deviceStateServiceMsg) {
//TODO 2.5
}
public void printStats() { public void printStats() {
int total = totalCounter.getAndSet(0); int total = totalCounter.getAndSet(0);
if (total > 0) { if (total > 0) {
@ -67,4 +71,5 @@ public class TbCoreConsumerStats {
subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0)); subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0));
} }
} }
} }

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -33,6 +33,7 @@ import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.Tenant;
@ -50,13 +51,19 @@ import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.discovery.PartitionChangeEvent;
import org.thingsboard.server.discovery.PartitionService;
import org.thingsboard.server.discovery.ServiceType;
import org.thingsboard.server.discovery.TopicPartitionInfo;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos; import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.provider.TbCoreQueueProvider;
import org.thingsboard.server.service.queue.TbMsgCallback;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -67,7 +74,6 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
@ -118,6 +124,12 @@ public class DefaultDeviceStateService implements DeviceStateService {
@Lazy @Lazy
private ActorService actorService; private ActorService actorService;
@Autowired
private TbCoreQueueProvider queueProvider;
@Autowired
private PartitionService partitionService;
@Autowired @Autowired
private TelemetrySubscriptionService tsSubService; private TelemetrySubscriptionService tsSubService;
@ -151,7 +163,6 @@ public class DefaultDeviceStateService implements DeviceStateService {
queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state"))); queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state")));
queueExecutor.submit(this::initStateFromDB); queueExecutor.submit(this::initStateFromDB);
queueExecutor.scheduleAtFixedRate(this::updateState, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS); queueExecutor.scheduleAtFixedRate(this::updateState, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
//TODO: schedule persistence in v2.1;
} }
@PreDestroy @PreDestroy
@ -163,38 +174,79 @@ public class DefaultDeviceStateService implements DeviceStateService {
@Override @Override
public void onDeviceAdded(Device device) { public void onDeviceAdded(Device device) {
queueExecutor.submit(() -> onDeviceAddedSync(device)); sendDeviceEvent(device.getTenantId(), device.getId(), true, false, false);
} }
@Override @Override
public void onDeviceUpdated(Device device) { public void onDeviceUpdated(Device device) {
queueExecutor.submit(() -> onDeviceUpdatedSync(device)); sendDeviceEvent(device.getTenantId(), device.getId(), false, true, false);
}
@Override
public void onDeviceDeleted(Device device) {
sendDeviceEvent(device.getTenantId(), device.getId(), false, false, true);
} }
@Override @Override
public void onDeviceConnect(DeviceId deviceId) { public void onDeviceConnect(DeviceId deviceId) {
queueExecutor.submit(() -> onDeviceConnectSync(deviceId)); DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (stateData != null) {
long ts = System.currentTimeMillis();
stateData.getState().setLastConnectTime(ts);
pushRuleEngineMessage(stateData, CONNECT_EVENT);
save(deviceId, LAST_CONNECT_TIME, ts);
}
} }
@Override @Override
public void onDeviceActivity(DeviceId deviceId) { public void onDeviceActivity(DeviceId deviceId) {
deviceLastReportedActivity.put(deviceId, System.currentTimeMillis()); deviceLastReportedActivity.put(deviceId, System.currentTimeMillis());
queueExecutor.submit(() -> onDeviceActivitySync(deviceId)); long lastReportedActivity = deviceLastReportedActivity.getOrDefault(deviceId, 0L);
long lastSavedActivity = deviceLastSavedActivity.getOrDefault(deviceId, 0L);
if (lastReportedActivity > 0 && lastReportedActivity > lastSavedActivity) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (stateData != null) {
DeviceState state = stateData.getState();
stateData.getState().setLastActivityTime(lastReportedActivity);
stateData.getMetaData().putValue("scope", SERVER_SCOPE);
pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
save(deviceId, LAST_ACTIVITY_TIME, lastReportedActivity);
deviceLastSavedActivity.put(deviceId, lastReportedActivity);
if (!state.isActive()) {
state.setActive(true);
save(deviceId, ACTIVITY_STATE, state.isActive());
}
}
}
} }
@Override @Override
public void onDeviceDisconnect(DeviceId deviceId) { public void onDeviceDisconnect(DeviceId deviceId) {
queueExecutor.submit(() -> onDeviceDisconnectSync(deviceId)); DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
} if (stateData != null) {
long ts = System.currentTimeMillis();
@Override stateData.getState().setLastDisconnectTime(ts);
public void onDeviceDeleted(Device device) { pushRuleEngineMessage(stateData, DISCONNECT_EVENT);
queueExecutor.submit(() -> onDeviceDeleted(device.getTenantId(), device.getId())); save(deviceId, LAST_DISCONNECT_TIME, ts);
}
} }
@Override @Override
public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) { public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
queueExecutor.submit(() -> onInactivityTimeoutUpdate(deviceId, inactivityTimeout)); if (inactivityTimeout == 0L) {
return;
}
DeviceStateData stateData = deviceStates.get(deviceId);
if (stateData != null) {
long ts = System.currentTimeMillis();
DeviceState state = stateData.getState();
state.setInactivityTimeout(inactivityTimeout);
boolean oldActive = state.isActive();
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
if (!oldActive && state.isActive() || oldActive && !state.isActive()) {
save(deviceId, ACTIVITY_STATE, state.isActive());
}
}
} }
@Override @Override
@ -206,26 +258,56 @@ public class DefaultDeviceStateService implements DeviceStateService {
} }
@Override @Override
public void onRemoteMsg(ServerAddress serverAddress, byte[] data) { public void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbMsgCallback callback) {
ClusterAPIProtos.DeviceStateServiceMsgProto proto;
try { try {
proto = ClusterAPIProtos.DeviceStateServiceMsgProto.parseFrom(data); TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
} catch (InvalidProtocolBufferException e) { DeviceId deviceId = new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB()));
throw new RuntimeException(e); if (proto.getDeleted()) {
} onDeviceDeleted(tenantId, deviceId);
TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); callback.onSuccess();
DeviceId deviceId = new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB())); } else {
if (proto.getDeleted()) { Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
queueExecutor.submit(() -> onDeviceDeleted(tenantId, deviceId)); if (device != null) {
} else { if (proto.getAdded()) {
Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId); Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
if (device != null) { @Override
if (proto.getAdded()) { public void onSuccess(@Nullable DeviceStateData state) {
onDeviceAdded(device); addDeviceUsingState(state);
} else if (proto.getUpdated()) { callback.onSuccess();
onDeviceUpdated(device); }
@Override
public void onFailure(Throwable t) {
log.warn("Failed to register device to the state service", t);
callback.onFailure(t);
}
});
} else if (proto.getUpdated()) {
DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());
if (stateData != null) {
TbMsgMetaData md = new TbMsgMetaData();
md.putValue("deviceName", device.getName());
md.putValue("deviceType", device.getType());
stateData.setMetaData(md);
}
}
} else {
//Device was probably deleted while message was in queue;
callback.onSuccess();
} }
} }
} catch (Exception e) {
log.trace("Failed to process queue msg: [{}]", proto, e);
callback.onFailure(e);
}
}
@Override
public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceKey().getServiceType())) {
repartition(partitionChangeEvent.getPartitions());
} }
} }
@ -241,9 +323,9 @@ public class DefaultDeviceStateService implements DeviceStateService {
for (Device device : page.getData()) { for (Device device : page.getData()) {
//TODO 2.5 //TODO 2.5
// if (!routingService.resolveById(device.getId()).isPresent()) { // if (!routingService.resolveById(device.getId()).isPresent()) {
if (!deviceStates.containsKey(device.getId())) { if (!deviceStates.containsKey(device.getId())) {
fetchFutures.add(fetchDeviceState(device)); fetchFutures.add(fetchDeviceState(device));
} }
// } else { // } else {
// Set<DeviceId> tenantDeviceSet = tenantDevices.get(tenant.getId()); // Set<DeviceId> tenantDeviceSet = tenantDevices.get(tenant.getId());
// if (tenantDeviceSet != null) { // if (tenantDeviceSet != null) {
@ -275,7 +357,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
for (Device device : page.getData()) { for (Device device : page.getData()) {
//TODO 2.5 //TODO 2.5
// if (!routingService.resolveById(device.getId()).isPresent()) { // if (!routingService.resolveById(device.getId()).isPresent()) {
fetchFutures.add(fetchDeviceState(device)); fetchFutures.add(fetchDeviceState(device));
// } // }
} }
try { try {
@ -319,105 +401,29 @@ public class DefaultDeviceStateService implements DeviceStateService {
} }
} }
private void onDeviceConnectSync(DeviceId deviceId) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (stateData != null) {
long ts = System.currentTimeMillis();
stateData.getState().setLastConnectTime(ts);
pushRuleEngineMessage(stateData, CONNECT_EVENT);
save(deviceId, LAST_CONNECT_TIME, ts);
}
}
private void onDeviceDisconnectSync(DeviceId deviceId) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (stateData != null) {
long ts = System.currentTimeMillis();
stateData.getState().setLastDisconnectTime(ts);
pushRuleEngineMessage(stateData, DISCONNECT_EVENT);
save(deviceId, LAST_DISCONNECT_TIME, ts);
}
}
private void onDeviceActivitySync(DeviceId deviceId) {
long lastReportedActivity = deviceLastReportedActivity.getOrDefault(deviceId, 0L);
long lastSavedActivity = deviceLastSavedActivity.getOrDefault(deviceId, 0L);
if (lastReportedActivity > 0 && lastReportedActivity > lastSavedActivity) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (stateData != null) {
DeviceState state = stateData.getState();
stateData.getState().setLastActivityTime(lastReportedActivity);
stateData.getMetaData().putValue("scope", SERVER_SCOPE);
pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
save(deviceId, LAST_ACTIVITY_TIME, lastReportedActivity);
deviceLastSavedActivity.put(deviceId, lastReportedActivity);
if (!state.isActive()) {
state.setActive(true);
save(deviceId, ACTIVITY_STATE, state.isActive());
}
}
}
}
private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) { private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) {
DeviceStateData deviceStateData = deviceStates.get(deviceId); DeviceStateData deviceStateData = deviceStates.get(deviceId);
if (deviceStateData == null) { if (deviceStateData == null) {
//TODO 2.5 //TODO 2.5
// if (!routingService.resolveById(deviceId).isPresent()) { // if (!routingService.resolveById(deviceId).isPresent()) {
Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId); Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
if (device != null) { if (device != null) {
try { try {
deviceStateData = fetchDeviceState(device).get(); deviceStateData = fetchDeviceState(device).get();
deviceStates.putIfAbsent(deviceId, deviceStateData); deviceStates.putIfAbsent(deviceId, deviceStateData);
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
log.debug("[{}] Failed to fetch device state!", deviceId, e); log.debug("[{}] Failed to fetch device state!", deviceId, e);
}
} }
}
// } // }
} }
return deviceStateData; return deviceStateData;
} }
private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) { private void sendDeviceEvent(TenantId tenantId, DeviceId deviceId, boolean added, boolean updated, boolean deleted) {
if (inactivityTimeout == 0L) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
return; log.trace("[{}][{}] Device is monitored on partition: {}", tenantId, deviceId, tpi);
} TransportProtos.DeviceStateServiceMsgProto.Builder builder = TransportProtos.DeviceStateServiceMsgProto.newBuilder();
DeviceStateData stateData = deviceStates.get(deviceId);
if (stateData != null) {
long ts = System.currentTimeMillis();
DeviceState state = stateData.getState();
state.setInactivityTimeout(inactivityTimeout);
boolean oldActive = state.isActive();
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
if (!oldActive && state.isActive() || oldActive && !state.isActive()) {
save(deviceId, ACTIVITY_STATE, state.isActive());
}
}
}
private void onDeviceAddedSync(Device device) {
//TODO 2.5
// Optional<ServerAddress> address = routingService.resolveById(device.getId());
// if (!address.isPresent()) {
Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
@Override
public void onSuccess(@Nullable DeviceStateData state) {
addDeviceUsingState(state);
}
@Override
public void onFailure(Throwable t) {
log.warn("Failed to register device to the state service", t);
}
});
// } else {
// sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), true, false, false);
// }
}
private void sendDeviceEvent(TenantId tenantId, DeviceId deviceId, ServerAddress address, boolean added, boolean updated, boolean deleted) {
log.trace("[{}][{}] Device is monitored on other server: {}", tenantId, deviceId, address);
ClusterAPIProtos.DeviceStateServiceMsgProto.Builder builder = ClusterAPIProtos.DeviceStateServiceMsgProto.newBuilder();
builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
builder.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()); builder.setDeviceIdMSB(deviceId.getId().getMostSignificantBits());
@ -425,43 +431,22 @@ public class DefaultDeviceStateService implements DeviceStateService {
builder.setAdded(added); builder.setAdded(added);
builder.setUpdated(updated); builder.setUpdated(updated);
builder.setDeleted(deleted); builder.setDeleted(deleted);
//TODO 2.5 TransportProtos.DeviceStateServiceMsgProto msg = builder.build();
// clusterRpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_DEVICE_STATE_SERVICE_MESSAGE, builder.build().toByteArray()); queueProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(deviceId.getId(),
} TransportProtos.ToCoreMsg.newBuilder().setDeviceStateServiceMsg(msg).build()), null);
private void onDeviceUpdatedSync(Device device) {
//TODO 2.5
// Optional<ServerAddress> address = routingService.resolveById(device.getId());
// if (!address.isPresent()) {
DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());
if (stateData != null) {
TbMsgMetaData md = new TbMsgMetaData();
md.putValue("deviceName", device.getName());
md.putValue("deviceType", device.getType());
stateData.setMetaData(md);
}
// } else {
// sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), false, true, false);
// }
} }
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) { private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
//TODO 2.5 deviceStates.remove(deviceId);
// Optional<ServerAddress> address = routingService.resolveById(deviceId); deviceLastReportedActivity.remove(deviceId);
// if (!address.isPresent()) { deviceLastSavedActivity.remove(deviceId);
deviceStates.remove(deviceId); Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
deviceLastReportedActivity.remove(deviceId); if (deviceIds != null) {
deviceLastSavedActivity.remove(deviceId); deviceIds.remove(deviceId);
Set<DeviceId> deviceIds = tenantDevices.get(tenantId); if (deviceIds.isEmpty()) {
if (deviceIds != null) { tenantDevices.remove(tenantId);
deviceIds.remove(deviceId);
if (deviceIds.isEmpty()) {
tenantDevices.remove(tenantId);
}
} }
// } else { }
// sendDeviceEvent(tenantId, deviceId, address.get(), false, false, true);
// }
} }
private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) { private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {

View File

@ -15,14 +15,17 @@
*/ */
package org.thingsboard.server.service.state; package org.thingsboard.server.service.state;
import org.springframework.context.ApplicationListener;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.discovery.PartitionChangeEvent;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.service.queue.TbMsgCallback;
/** /**
* Created by ashvayka on 01.05.18. * Created by ashvayka on 01.05.18.
*/ */
public interface DeviceStateService { public interface DeviceStateService extends ApplicationListener<PartitionChangeEvent> {
void onDeviceAdded(Device device); void onDeviceAdded(Device device);
@ -38,7 +41,6 @@ public interface DeviceStateService {
void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout); void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
void onClusterUpdate(); void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto serverAddress, TbMsgCallback bytes);
void onRemoteMsg(ServerAddress serverAddress, byte[] bytes);
} }

View File

@ -134,13 +134,3 @@ message FromDeviceRPCResponseProto {
string response = 3; string response = 3;
int32 error = 4; int32 error = 4;
} }
message DeviceStateServiceMsgProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
bool added = 5;
bool updated = 6;
bool deleted = 7;
}

View File

@ -27,6 +27,8 @@
<logger name="org.thingsboard.server" level="INFO" /> <logger name="org.thingsboard.server" level="INFO" />
<logger name="akka" level="INFO" /> <logger name="akka" level="INFO" />
<logger name="org.thingsboard.server.service.queue" level="TRACE" />
<logger name="org.thingsboard.server.service.transport" level="TRACE" />
<root level="INFO"> <root level="INFO">
<appender-ref ref="STDOUT"/> <appender-ref ref="STDOUT"/>

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -46,16 +46,13 @@ public class AbstractTbQueueTemplate {
return new String(data, StandardCharsets.UTF_8); return new String(data, StandardCharsets.UTF_8);
} }
private static ByteBuffer longBuffer = ByteBuffer.allocate(Long.BYTES);
protected static byte[] longToBytes(long x) { protected static byte[] longToBytes(long x) {
ByteBuffer longBuffer = ByteBuffer.allocate(Long.BYTES);
longBuffer.putLong(0, x); longBuffer.putLong(0, x);
return longBuffer.array(); return longBuffer.array();
} }
protected static long bytesToLong(byte[] bytes) { protected static long bytesToLong(byte[] bytes) {
longBuffer.put(bytes, 0, bytes.length); return ByteBuffer.wrap(bytes).getLong();
longBuffer.flip();//need flip
return longBuffer.getLong();
} }
} }

View File

@ -101,6 +101,7 @@ public class ConsistentHashPartitionService implements PartitionService {
return topicPartitions; return topicPartitions;
} }
//TODO 2.5 This should return cached TopicPartitionInfo objects instead of creating new one every time.
@Override @Override
public TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId) { public TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId) {
boolean isolated = isolatedTenants.get(tenantId) != null && isolatedTenants.get(tenantId).contains(serviceType); boolean isolated = isolatedTenants.get(tenantId) != null && isolatedTenants.get(tenantId).contains(serviceType);

View File

@ -15,6 +15,6 @@
*/ */
package org.thingsboard.server.discovery; package org.thingsboard.server.discovery;
public interface PartitionDiscoveryService { public interface DiscoveryService {
} }

View File

@ -28,7 +28,7 @@ import java.util.Collections;
@ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "false", matchIfMissing = true) @ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "false", matchIfMissing = true)
@Slf4j @Slf4j
@DependsOn("environmentLogService") @DependsOn("environmentLogService")
public class DummyDiscoveryService implements PartitionDiscoveryService { public class DummyDiscoveryService implements DiscoveryService {
private final TbServiceInfoProvider serviceInfoProvider; private final TbServiceInfoProvider serviceInfoProvider;
private final PartitionService partitionService; private final PartitionService partitionService;

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.discovery; package org.thingsboard.server.discovery;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
@ -31,7 +30,6 @@ import org.apache.curator.retry.RetryForever;
import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.boot.context.event.ApplicationReadyEvent;
@ -39,18 +37,12 @@ import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -60,7 +52,7 @@ import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.
@Service @Service
@ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "true", matchIfMissing = false) @ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "true", matchIfMissing = false)
@Slf4j @Slf4j
public class ZkPartitionDiscoveryService implements PartitionDiscoveryService, PathChildrenCacheListener { public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener {
@Value("${zk.url}") @Value("${zk.url}")
private String zkUrl; private String zkUrl;
@ -84,7 +76,7 @@ public class ZkPartitionDiscoveryService implements PartitionDiscoveryService, P
private volatile boolean stopped = true; private volatile boolean stopped = true;
public ZkPartitionDiscoveryService(TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService) { public ZkDiscoveryService(TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService) {
this.serviceInfoProvider = serviceInfoProvider; this.serviceInfoProvider = serviceInfoProvider;
this.partitionService = partitionService; this.partitionService = partitionService;
} }

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -45,6 +45,8 @@ public class KafkaMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEn
private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings;
private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> tbCoreProducer;
public KafkaMonolithQueueProvider(TbKafkaSettings kafkaSettings, public KafkaMonolithQueueProvider(TbKafkaSettings kafkaSettings,
TbNodeIdProvider nodeIdProvider, TbNodeIdProvider nodeIdProvider,
TbQueueCoreSettings coreSettings, TbQueueCoreSettings coreSettings,
@ -77,13 +79,21 @@ public class KafkaMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEn
return requestBuilder.build(); return requestBuilder.build();
} }
//TODO 2.5 Singleton
@Override @Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() { public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() {
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); if (tbCoreProducer == null) {
requestBuilder.settings(kafkaSettings); synchronized (this) {
requestBuilder.clientId("producer-core-" + nodeIdProvider.getNodeId()); if (tbCoreProducer == null) {
requestBuilder.defaultTopic(coreSettings.getTopic()); TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TBKafkaProducerTemplate.builder();
return requestBuilder.build(); requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-core-" + nodeIdProvider.getNodeId());
requestBuilder.defaultTopic(coreSettings.getTopic());
tbCoreProducer = requestBuilder.build();
}
}
}
return tbCoreProducer;
} }
@Override @Override

View File

@ -239,6 +239,20 @@ message DeviceActorToTransportMsg {
ToServerRpcResponseMsg toServerResponse = 7; ToServerRpcResponseMsg toServerResponse = 7;
} }
/**
* TB Core to TB Core messages
*/
message DeviceStateServiceMsgProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
bool added = 5;
bool updated = 6;
bool deleted = 7;
}
/** /**
* Main messages; * Main messages;
*/ */
@ -259,6 +273,7 @@ message TransportApiResponseMsg {
/* Messages that are handled by ThingsBoard Core Service */ /* Messages that are handled by ThingsBoard Core Service */
message ToCoreMsg { message ToCoreMsg {
TransportToDeviceActorMsg toDeviceActorMsg = 1; TransportToDeviceActorMsg toDeviceActorMsg = 1;
DeviceStateServiceMsgProto deviceStateServiceMsg = 2;
} }
/* Messages that are handled by ThingsBoard RuleEngine Service */ /* Messages that are handled by ThingsBoard RuleEngine Service */