diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 3db5c82ee2..6b8e950fdb 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -361,7 +361,6 @@ public class ActorSystemContext { event.setTenantId(tenantId); event.setEntityId(entityId); event.setType(DataConstants.ERROR); - //TODO 2.5 event.setBody(toBodyJson(serviceInfoProvider.getServiceInfo().getServiceId(), method, toString(e))); persistEvent(event); } @@ -371,7 +370,6 @@ public class ActorSystemContext { event.setTenantId(tenantId); event.setEntityId(entityId); event.setType(DataConstants.LC_EVENT); - //TODO 2.5 event.setBody(toBodyJson(serviceInfoProvider.getServiceInfo().getServiceId(), lcEvent, Optional.ofNullable(e))); persistEvent(event); } @@ -403,9 +401,7 @@ public class ActorSystemContext { public String getServerAddress() { - //TODO 2.5 -// return discoveryService.getCurrentServer().getServerAddress().toString(); - return null; + return serviceInfoProvider.getServiceId(); } public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) { diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 6b878ef7d8..6b5dfbf6c3 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -65,6 +65,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseM import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; +import org.thingsboard.server.service.queue.TbMsgCallback; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg; @@ -225,6 +226,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) { boolean reportDeviceActivity = false; TransportToDeviceActorMsg msg = wrapper.getMsg(); + TbMsgCallback callback = wrapper.getCallback(); if (msg.hasSessionEvent()) { processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent()); } @@ -258,6 +260,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { if (reportDeviceActivity) { reportLogicalDeviceActivity(); } + callback.onSuccess(); } //TODO 2.5 move this as a notification to the queue; diff --git a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java index c8e55bf9ed..8757deb2b4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -58,13 +58,12 @@ public class StatsActor extends ContextAwareActor { event.setEntityId(msg.getEntityId()); event.setTenantId(msg.getTenantId()); event.setType(DataConstants.STATS); - //TODO 2.5 -// event.setBody(toBodyJson(systemContext.getDiscoveryService().getCurrentServer().getServerAddress(), msg.getMessagesProcessed(), msg.getErrorsOccurred())); + event.setBody(toBodyJson(systemContext.getServiceInfoProvider().getServiceId(), msg.getMessagesProcessed(), msg.getErrorsOccurred())); systemContext.getEventService().save(event); } - private JsonNode toBodyJson(ServerAddress server, long messagesProcessed, long errorsOccurred) { - return mapper.createObjectNode().put("server", server.toString()).put("messagesProcessed", messagesProcessed).put("errorsOccurred", errorsOccurred); + private JsonNode toBodyJson(String serviceId, long messagesProcessed, long errorsOccurred) { + return mapper.createObjectNode().put("server", serviceId).put("messagesProcessed", messagesProcessed).put("errorsOccurred", errorsOccurred); } public static class ActorCreator extends ContextBasedCreator { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 1d3cf11f40..c95599f2ad 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -29,9 +29,11 @@ import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.discovery.PartitionChangeEvent; import org.thingsboard.server.discovery.ServiceType; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.provider.TbCoreQueueProvider; +import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import javax.annotation.PostConstruct; @@ -59,14 +61,16 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService { private boolean statsEnabled; private final ActorSystemContext actorContext; + private final DeviceStateService stateService; private final TbQueueConsumer> consumer; private final TbCoreConsumerStats stats = new TbCoreConsumerStats(); private volatile ExecutorService mainConsumerExecutor; private volatile boolean stopped = false; - public DefaultTbCoreConsumerService(TbCoreQueueProvider tbCoreQueueProvider, ActorSystemContext actorContext) { + public DefaultTbCoreConsumerService(TbCoreQueueProvider tbCoreQueueProvider, ActorSystemContext actorContext, DeviceStateService stateService) { this.consumer = tbCoreQueueProvider.getToCoreMsgConsumer(); this.actorContext = actorContext; + this.stateService = stateService; } @PostConstruct @@ -88,7 +92,7 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService { while (!stopped) { try { List> msgs = consumer.poll(pollDuration); - if(msgs.isEmpty()){ + if (msgs.isEmpty()) { continue; } ConcurrentMap> ackMap = msgs.stream().collect( @@ -98,11 +102,12 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService { TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, ackMap); try { ToCoreMsg toCoreMsg = msg.getValue(); - log.trace("Forwarding message to rule engine {}", toCoreMsg); if (toCoreMsg.hasToDeviceActorMsg()) { + log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg()); forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback); - } else { - callback.onSuccess(); + } else if (toCoreMsg.hasDeviceStateServiceMsg()) { + log.trace("[{}] Forwarding message to state service {}", id, toCoreMsg.getDeviceStateServiceMsg()); + forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback); } } catch (Throwable e) { callback.onFailure(e); @@ -124,6 +129,13 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService { }); } + private void forwardToStateService(TransportProtos.DeviceStateServiceMsgProto deviceStateServiceMsg, TbMsgCallback callback) { + if (statsEnabled) { + stats.log(deviceStateServiceMsg); + } + stateService.onQueueMsg(deviceStateServiceMsg, callback); + } + private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbMsgCallback callback) { if (statsEnabled) { stats.log(toDeviceActorMsg); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/MsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/MsgPackCallback.java index 5b94db28b0..d7eebd98b0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/MsgPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/MsgPackCallback.java @@ -36,6 +36,7 @@ public class MsgPackCallback i @Override public void onSuccess() { + log.trace("[{}] ON SUCCESS", id); if (ackMap.remove(id) != null && ackMap.isEmpty()) { processingTimeoutLatch.countDown(); } @@ -43,6 +44,7 @@ public class MsgPackCallback i @Override public void onFailure(Throwable t) { + log.trace("[{}] ON FAILURE", id); TbProtoQueueMsg message = ackMap.remove(id); log.warn("Failed to process message: {}", message.getValue(), t); if (ackMap.isEmpty()) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java index f7912836c5..f2956ea747 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -57,6 +57,10 @@ public class TbCoreConsumerStats { } } + public void log(TransportProtos.DeviceStateServiceMsgProto deviceStateServiceMsg) { + //TODO 2.5 + } + public void printStats() { int total = totalCounter.getAndSet(0); if (total > 0) { @@ -67,4 +71,5 @@ public class TbCoreConsumerStats { subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0)); } } + } diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 31b712644d..8d82b90fb6 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -33,6 +33,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.service.ActorService; +import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Tenant; @@ -50,13 +51,19 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; -import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.discovery.PartitionChangeEvent; +import org.thingsboard.server.discovery.PartitionService; +import org.thingsboard.server.discovery.ServiceType; +import org.thingsboard.server.discovery.TopicPartitionInfo; import org.thingsboard.server.gen.cluster.ClusterAPIProtos; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.provider.TbCoreQueueProvider; +import org.thingsboard.server.service.queue.TbMsgCallback; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import javax.annotation.Nullable; @@ -67,7 +74,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -118,6 +124,12 @@ public class DefaultDeviceStateService implements DeviceStateService { @Lazy private ActorService actorService; + @Autowired + private TbCoreQueueProvider queueProvider; + + @Autowired + private PartitionService partitionService; + @Autowired private TelemetrySubscriptionService tsSubService; @@ -151,7 +163,6 @@ public class DefaultDeviceStateService implements DeviceStateService { queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state"))); queueExecutor.submit(this::initStateFromDB); queueExecutor.scheduleAtFixedRate(this::updateState, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS); - //TODO: schedule persistence in v2.1; } @PreDestroy @@ -163,38 +174,79 @@ public class DefaultDeviceStateService implements DeviceStateService { @Override public void onDeviceAdded(Device device) { - queueExecutor.submit(() -> onDeviceAddedSync(device)); + sendDeviceEvent(device.getTenantId(), device.getId(), true, false, false); } @Override public void onDeviceUpdated(Device device) { - queueExecutor.submit(() -> onDeviceUpdatedSync(device)); + sendDeviceEvent(device.getTenantId(), device.getId(), false, true, false); + } + + @Override + public void onDeviceDeleted(Device device) { + sendDeviceEvent(device.getTenantId(), device.getId(), false, false, true); } @Override public void onDeviceConnect(DeviceId deviceId) { - queueExecutor.submit(() -> onDeviceConnectSync(deviceId)); + DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); + if (stateData != null) { + long ts = System.currentTimeMillis(); + stateData.getState().setLastConnectTime(ts); + pushRuleEngineMessage(stateData, CONNECT_EVENT); + save(deviceId, LAST_CONNECT_TIME, ts); + } } @Override public void onDeviceActivity(DeviceId deviceId) { deviceLastReportedActivity.put(deviceId, System.currentTimeMillis()); - queueExecutor.submit(() -> onDeviceActivitySync(deviceId)); + long lastReportedActivity = deviceLastReportedActivity.getOrDefault(deviceId, 0L); + long lastSavedActivity = deviceLastSavedActivity.getOrDefault(deviceId, 0L); + if (lastReportedActivity > 0 && lastReportedActivity > lastSavedActivity) { + DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); + if (stateData != null) { + DeviceState state = stateData.getState(); + stateData.getState().setLastActivityTime(lastReportedActivity); + stateData.getMetaData().putValue("scope", SERVER_SCOPE); + pushRuleEngineMessage(stateData, ACTIVITY_EVENT); + save(deviceId, LAST_ACTIVITY_TIME, lastReportedActivity); + deviceLastSavedActivity.put(deviceId, lastReportedActivity); + if (!state.isActive()) { + state.setActive(true); + save(deviceId, ACTIVITY_STATE, state.isActive()); + } + } + } } @Override public void onDeviceDisconnect(DeviceId deviceId) { - queueExecutor.submit(() -> onDeviceDisconnectSync(deviceId)); - } - - @Override - public void onDeviceDeleted(Device device) { - queueExecutor.submit(() -> onDeviceDeleted(device.getTenantId(), device.getId())); + DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); + if (stateData != null) { + long ts = System.currentTimeMillis(); + stateData.getState().setLastDisconnectTime(ts); + pushRuleEngineMessage(stateData, DISCONNECT_EVENT); + save(deviceId, LAST_DISCONNECT_TIME, ts); + } } @Override public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) { - queueExecutor.submit(() -> onInactivityTimeoutUpdate(deviceId, inactivityTimeout)); + if (inactivityTimeout == 0L) { + return; + } + DeviceStateData stateData = deviceStates.get(deviceId); + if (stateData != null) { + long ts = System.currentTimeMillis(); + DeviceState state = stateData.getState(); + state.setInactivityTimeout(inactivityTimeout); + boolean oldActive = state.isActive(); + state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout()); + if (!oldActive && state.isActive() || oldActive && !state.isActive()) { + save(deviceId, ACTIVITY_STATE, state.isActive()); + } + } } @Override @@ -206,26 +258,56 @@ public class DefaultDeviceStateService implements DeviceStateService { } @Override - public void onRemoteMsg(ServerAddress serverAddress, byte[] data) { - ClusterAPIProtos.DeviceStateServiceMsgProto proto; + public void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbMsgCallback callback) { try { - proto = ClusterAPIProtos.DeviceStateServiceMsgProto.parseFrom(data); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); - DeviceId deviceId = new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB())); - if (proto.getDeleted()) { - queueExecutor.submit(() -> onDeviceDeleted(tenantId, deviceId)); - } else { - Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId); - if (device != null) { - if (proto.getAdded()) { - onDeviceAdded(device); - } else if (proto.getUpdated()) { - onDeviceUpdated(device); + TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); + DeviceId deviceId = new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB())); + if (proto.getDeleted()) { + onDeviceDeleted(tenantId, deviceId); + callback.onSuccess(); + } else { + Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId); + if (device != null) { + if (proto.getAdded()) { + Futures.addCallback(fetchDeviceState(device), new FutureCallback() { + @Override + public void onSuccess(@Nullable DeviceStateData state) { + addDeviceUsingState(state); + callback.onSuccess(); + } + + @Override + public void onFailure(Throwable t) { + log.warn("Failed to register device to the state service", t); + callback.onFailure(t); + } + }); + } else if (proto.getUpdated()) { + DeviceStateData stateData = getOrFetchDeviceStateData(device.getId()); + if (stateData != null) { + TbMsgMetaData md = new TbMsgMetaData(); + md.putValue("deviceName", device.getName()); + md.putValue("deviceType", device.getType()); + stateData.setMetaData(md); + } + } + } else { + //Device was probably deleted while message was in queue; + callback.onSuccess(); } } + + } catch (Exception e) { + log.trace("Failed to process queue msg: [{}]", proto, e); + callback.onFailure(e); + } + + } + + @Override + public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) { + if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceKey().getServiceType())) { + repartition(partitionChangeEvent.getPartitions()); } } @@ -241,9 +323,9 @@ public class DefaultDeviceStateService implements DeviceStateService { for (Device device : page.getData()) { //TODO 2.5 // if (!routingService.resolveById(device.getId()).isPresent()) { - if (!deviceStates.containsKey(device.getId())) { - fetchFutures.add(fetchDeviceState(device)); - } + if (!deviceStates.containsKey(device.getId())) { + fetchFutures.add(fetchDeviceState(device)); + } // } else { // Set tenantDeviceSet = tenantDevices.get(tenant.getId()); // if (tenantDeviceSet != null) { @@ -275,7 +357,7 @@ public class DefaultDeviceStateService implements DeviceStateService { for (Device device : page.getData()) { //TODO 2.5 // if (!routingService.resolveById(device.getId()).isPresent()) { - fetchFutures.add(fetchDeviceState(device)); + fetchFutures.add(fetchDeviceState(device)); // } } try { @@ -319,105 +401,29 @@ public class DefaultDeviceStateService implements DeviceStateService { } } - private void onDeviceConnectSync(DeviceId deviceId) { - DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); - if (stateData != null) { - long ts = System.currentTimeMillis(); - stateData.getState().setLastConnectTime(ts); - pushRuleEngineMessage(stateData, CONNECT_EVENT); - save(deviceId, LAST_CONNECT_TIME, ts); - } - } - - private void onDeviceDisconnectSync(DeviceId deviceId) { - DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); - if (stateData != null) { - long ts = System.currentTimeMillis(); - stateData.getState().setLastDisconnectTime(ts); - pushRuleEngineMessage(stateData, DISCONNECT_EVENT); - save(deviceId, LAST_DISCONNECT_TIME, ts); - } - } - - private void onDeviceActivitySync(DeviceId deviceId) { - long lastReportedActivity = deviceLastReportedActivity.getOrDefault(deviceId, 0L); - long lastSavedActivity = deviceLastSavedActivity.getOrDefault(deviceId, 0L); - if (lastReportedActivity > 0 && lastReportedActivity > lastSavedActivity) { - DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); - if (stateData != null) { - DeviceState state = stateData.getState(); - stateData.getState().setLastActivityTime(lastReportedActivity); - stateData.getMetaData().putValue("scope", SERVER_SCOPE); - pushRuleEngineMessage(stateData, ACTIVITY_EVENT); - save(deviceId, LAST_ACTIVITY_TIME, lastReportedActivity); - deviceLastSavedActivity.put(deviceId, lastReportedActivity); - if (!state.isActive()) { - state.setActive(true); - save(deviceId, ACTIVITY_STATE, state.isActive()); - } - } - } - } - private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) { DeviceStateData deviceStateData = deviceStates.get(deviceId); if (deviceStateData == null) { //TODO 2.5 // if (!routingService.resolveById(deviceId).isPresent()) { - Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId); - if (device != null) { - try { - deviceStateData = fetchDeviceState(device).get(); - deviceStates.putIfAbsent(deviceId, deviceStateData); - } catch (InterruptedException | ExecutionException e) { - log.debug("[{}] Failed to fetch device state!", deviceId, e); - } + Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId); + if (device != null) { + try { + deviceStateData = fetchDeviceState(device).get(); + deviceStates.putIfAbsent(deviceId, deviceStateData); + } catch (InterruptedException | ExecutionException e) { + log.debug("[{}] Failed to fetch device state!", deviceId, e); } + } // } } return deviceStateData; } - private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) { - if (inactivityTimeout == 0L) { - return; - } - DeviceStateData stateData = deviceStates.get(deviceId); - if (stateData != null) { - long ts = System.currentTimeMillis(); - DeviceState state = stateData.getState(); - state.setInactivityTimeout(inactivityTimeout); - boolean oldActive = state.isActive(); - state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout()); - if (!oldActive && state.isActive() || oldActive && !state.isActive()) { - save(deviceId, ACTIVITY_STATE, state.isActive()); - } - } - } - - private void onDeviceAddedSync(Device device) { - //TODO 2.5 -// Optional address = routingService.resolveById(device.getId()); -// if (!address.isPresent()) { - Futures.addCallback(fetchDeviceState(device), new FutureCallback() { - @Override - public void onSuccess(@Nullable DeviceStateData state) { - addDeviceUsingState(state); - } - - @Override - public void onFailure(Throwable t) { - log.warn("Failed to register device to the state service", t); - } - }); -// } else { -// sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), true, false, false); -// } - } - - private void sendDeviceEvent(TenantId tenantId, DeviceId deviceId, ServerAddress address, boolean added, boolean updated, boolean deleted) { - log.trace("[{}][{}] Device is monitored on other server: {}", tenantId, deviceId, address); - ClusterAPIProtos.DeviceStateServiceMsgProto.Builder builder = ClusterAPIProtos.DeviceStateServiceMsgProto.newBuilder(); + private void sendDeviceEvent(TenantId tenantId, DeviceId deviceId, boolean added, boolean updated, boolean deleted) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); + log.trace("[{}][{}] Device is monitored on partition: {}", tenantId, deviceId, tpi); + TransportProtos.DeviceStateServiceMsgProto.Builder builder = TransportProtos.DeviceStateServiceMsgProto.newBuilder(); builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); builder.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()); @@ -425,43 +431,22 @@ public class DefaultDeviceStateService implements DeviceStateService { builder.setAdded(added); builder.setUpdated(updated); builder.setDeleted(deleted); - //TODO 2.5 -// clusterRpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_DEVICE_STATE_SERVICE_MESSAGE, builder.build().toByteArray()); - } - - private void onDeviceUpdatedSync(Device device) { - //TODO 2.5 -// Optional address = routingService.resolveById(device.getId()); -// if (!address.isPresent()) { - DeviceStateData stateData = getOrFetchDeviceStateData(device.getId()); - if (stateData != null) { - TbMsgMetaData md = new TbMsgMetaData(); - md.putValue("deviceName", device.getName()); - md.putValue("deviceType", device.getType()); - stateData.setMetaData(md); - } -// } else { -// sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), false, true, false); -// } + TransportProtos.DeviceStateServiceMsgProto msg = builder.build(); + queueProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), + TransportProtos.ToCoreMsg.newBuilder().setDeviceStateServiceMsg(msg).build()), null); } private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) { - //TODO 2.5 -// Optional address = routingService.resolveById(deviceId); -// if (!address.isPresent()) { - deviceStates.remove(deviceId); - deviceLastReportedActivity.remove(deviceId); - deviceLastSavedActivity.remove(deviceId); - Set deviceIds = tenantDevices.get(tenantId); - if (deviceIds != null) { - deviceIds.remove(deviceId); - if (deviceIds.isEmpty()) { - tenantDevices.remove(tenantId); - } + deviceStates.remove(deviceId); + deviceLastReportedActivity.remove(deviceId); + deviceLastSavedActivity.remove(deviceId); + Set deviceIds = tenantDevices.get(tenantId); + if (deviceIds != null) { + deviceIds.remove(deviceId); + if (deviceIds.isEmpty()) { + tenantDevices.remove(tenantId); } -// } else { -// sendDeviceEvent(tenantId, deviceId, address.get(), false, false, true); -// } + } } private ListenableFuture fetchDeviceState(Device device) { diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java index 8dbe7579e3..636a9e9bb1 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java @@ -15,14 +15,17 @@ */ package org.thingsboard.server.service.state; +import org.springframework.context.ApplicationListener; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.discovery.PartitionChangeEvent; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.service.queue.TbMsgCallback; /** * Created by ashvayka on 01.05.18. */ -public interface DeviceStateService { +public interface DeviceStateService extends ApplicationListener { void onDeviceAdded(Device device); @@ -38,7 +41,6 @@ public interface DeviceStateService { void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout); - void onClusterUpdate(); + void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto serverAddress, TbMsgCallback bytes); - void onRemoteMsg(ServerAddress serverAddress, byte[] bytes); } diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto index cfacc66121..46526e8e1c 100644 --- a/application/src/main/proto/cluster.proto +++ b/application/src/main/proto/cluster.proto @@ -134,13 +134,3 @@ message FromDeviceRPCResponseProto { string response = 3; int32 error = 4; } - -message DeviceStateServiceMsgProto { - int64 tenantIdMSB = 1; - int64 tenantIdLSB = 2; - int64 deviceIdMSB = 3; - int64 deviceIdLSB = 4; - bool added = 5; - bool updated = 6; - bool deleted = 7; -} diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index e147325f42..3d47db432f 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -27,6 +27,8 @@ + + diff --git a/common/queue/src/main/java/org/thingsboard/server/common/AbstractTbQueueTemplate.java b/common/queue/src/main/java/org/thingsboard/server/common/AbstractTbQueueTemplate.java index b70fc417c8..a6193a0b48 100644 --- a/common/queue/src/main/java/org/thingsboard/server/common/AbstractTbQueueTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/common/AbstractTbQueueTemplate.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -46,16 +46,13 @@ public class AbstractTbQueueTemplate { return new String(data, StandardCharsets.UTF_8); } - private static ByteBuffer longBuffer = ByteBuffer.allocate(Long.BYTES); - protected static byte[] longToBytes(long x) { + ByteBuffer longBuffer = ByteBuffer.allocate(Long.BYTES); longBuffer.putLong(0, x); return longBuffer.array(); } protected static long bytesToLong(byte[] bytes) { - longBuffer.put(bytes, 0, bytes.length); - longBuffer.flip();//need flip - return longBuffer.getLong(); + return ByteBuffer.wrap(bytes).getLong(); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/discovery/ConsistentHashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/discovery/ConsistentHashPartitionService.java index 25c936f829..831748da06 100644 --- a/common/queue/src/main/java/org/thingsboard/server/discovery/ConsistentHashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/discovery/ConsistentHashPartitionService.java @@ -101,6 +101,7 @@ public class ConsistentHashPartitionService implements PartitionService { return topicPartitions; } + //TODO 2.5 This should return cached TopicPartitionInfo objects instead of creating new one every time. @Override public TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId) { boolean isolated = isolatedTenants.get(tenantId) != null && isolatedTenants.get(tenantId).contains(serviceType); diff --git a/common/queue/src/main/java/org/thingsboard/server/discovery/PartitionDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/discovery/DiscoveryService.java similarity index 93% rename from common/queue/src/main/java/org/thingsboard/server/discovery/PartitionDiscoveryService.java rename to common/queue/src/main/java/org/thingsboard/server/discovery/DiscoveryService.java index 6fe4571517..8272df5fdc 100644 --- a/common/queue/src/main/java/org/thingsboard/server/discovery/PartitionDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/discovery/DiscoveryService.java @@ -15,6 +15,6 @@ */ package org.thingsboard.server.discovery; -public interface PartitionDiscoveryService { +public interface DiscoveryService { } diff --git a/common/queue/src/main/java/org/thingsboard/server/discovery/DummyDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/discovery/DummyDiscoveryService.java index 8983cf1e97..28fd5c1fb3 100644 --- a/common/queue/src/main/java/org/thingsboard/server/discovery/DummyDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/discovery/DummyDiscoveryService.java @@ -28,7 +28,7 @@ import java.util.Collections; @ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "false", matchIfMissing = true) @Slf4j @DependsOn("environmentLogService") -public class DummyDiscoveryService implements PartitionDiscoveryService { +public class DummyDiscoveryService implements DiscoveryService { private final TbServiceInfoProvider serviceInfoProvider; private final PartitionService partitionService; diff --git a/common/queue/src/main/java/org/thingsboard/server/discovery/ZkPartitionDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/discovery/ZkDiscoveryService.java similarity index 95% rename from common/queue/src/main/java/org/thingsboard/server/discovery/ZkPartitionDiscoveryService.java rename to common/queue/src/main/java/org/thingsboard/server/discovery/ZkDiscoveryService.java index 07f11617ac..48c886b7e4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/discovery/ZkPartitionDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/discovery/ZkDiscoveryService.java @@ -16,7 +16,6 @@ package org.thingsboard.server.discovery; import com.google.protobuf.InvalidProtocolBufferException; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -31,7 +30,6 @@ import org.apache.curator.retry.RetryForever; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.event.ApplicationReadyEvent; @@ -39,18 +37,12 @@ import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.gen.transport.TransportProtos; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -60,7 +52,7 @@ import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent. @Service @ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "true", matchIfMissing = false) @Slf4j -public class ZkPartitionDiscoveryService implements PartitionDiscoveryService, PathChildrenCacheListener { +public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener { @Value("${zk.url}") private String zkUrl; @@ -84,7 +76,7 @@ public class ZkPartitionDiscoveryService implements PartitionDiscoveryService, P private volatile boolean stopped = true; - public ZkPartitionDiscoveryService(TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService) { + public ZkDiscoveryService(TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService) { this.serviceInfoProvider = serviceInfoProvider; this.partitionService = partitionService; } diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaMonolithQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaMonolithQueueProvider.java index 01e7a211d3..b7412a7c99 100644 --- a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaMonolithQueueProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaMonolithQueueProvider.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -45,6 +45,8 @@ public class KafkaMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEn private final TbQueueTransportApiSettings transportApiSettings; private final TbQueueTransportNotificationSettings transportNotificationSettings; + private TbQueueProducer> tbCoreProducer; + public KafkaMonolithQueueProvider(TbKafkaSettings kafkaSettings, TbNodeIdProvider nodeIdProvider, TbQueueCoreSettings coreSettings, @@ -77,13 +79,21 @@ public class KafkaMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEn return requestBuilder.build(); } + //TODO 2.5 Singleton @Override public TbQueueProducer> getTbCoreMsgProducer() { - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder> requestBuilder = TBKafkaProducerTemplate.builder(); - requestBuilder.settings(kafkaSettings); - requestBuilder.clientId("producer-core-" + nodeIdProvider.getNodeId()); - requestBuilder.defaultTopic(coreSettings.getTopic()); - return requestBuilder.build(); + if (tbCoreProducer == null) { + synchronized (this) { + if (tbCoreProducer == null) { + TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder> requestBuilder = TBKafkaProducerTemplate.builder(); + requestBuilder.settings(kafkaSettings); + requestBuilder.clientId("producer-core-" + nodeIdProvider.getNodeId()); + requestBuilder.defaultTopic(coreSettings.getTopic()); + tbCoreProducer = requestBuilder.build(); + } + } + } + return tbCoreProducer; } @Override diff --git a/common/queue/src/main/proto/transport.proto b/common/queue/src/main/proto/queue.proto similarity index 95% rename from common/queue/src/main/proto/transport.proto rename to common/queue/src/main/proto/queue.proto index 8c564b6540..17f8b9c31c 100644 --- a/common/queue/src/main/proto/transport.proto +++ b/common/queue/src/main/proto/queue.proto @@ -239,6 +239,20 @@ message DeviceActorToTransportMsg { ToServerRpcResponseMsg toServerResponse = 7; } +/** + * TB Core to TB Core messages + */ + +message DeviceStateServiceMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; + bool added = 5; + bool updated = 6; + bool deleted = 7; +} + /** * Main messages; */ @@ -259,6 +273,7 @@ message TransportApiResponseMsg { /* Messages that are handled by ThingsBoard Core Service */ message ToCoreMsg { TransportToDeviceActorMsg toDeviceActorMsg = 1; + DeviceStateServiceMsgProto deviceStateServiceMsg = 2; } /* Messages that are handled by ThingsBoard RuleEngine Service */