diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 0896c5eae2..54737f84d5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -65,6 +65,7 @@ import org.thingsboard.server.service.executors.ExternalCallExecutorService; import org.thingsboard.server.service.mail.MailExecutorService; import org.thingsboard.server.service.rpc.DeviceRpcService; import org.thingsboard.server.service.script.JsExecutorService; +import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.io.IOException; @@ -197,6 +198,10 @@ public class ActorSystemContext { @Getter private MsgQueue msgQueue; + @Autowired + @Getter + private DeviceStateService deviceStateService; + @Value("${actors.session.sync.timeout}") @Getter private long syncSessionTimeout; diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index cd043bf51f..5112f220de 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -265,17 +265,32 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso break; case POST_ATTRIBUTES_REQUEST: handlePostAttributesRequest(context, msg); + reportActivity(); break; case POST_TELEMETRY_REQUEST: handlePostTelemetryRequest(context, msg); + reportActivity(); break; case TO_SERVER_RPC_REQUEST: handleClientSideRPCRequest(context, msg); + reportActivity(); break; } } } + private void reportActivity() { + systemContext.getDeviceStateService().onDeviceActivity(deviceId); + } + + private void reportSessionOpen() { + systemContext.getDeviceStateService().onDeviceConnect(deviceId); + } + + private void reportSessionClose() { + systemContext.getDeviceStateService().onDeviceDisconnect(deviceId); + } + private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) { GetAttributesRequest request = (GetAttributesRequest) src.getPayload(); ListenableFuture> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames()); @@ -488,11 +503,17 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (inMsg instanceof SessionOpenMsg) { logger.debug("[{}] Processing new session [{}]", deviceId, sessionId); sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress())); + if (sessions.size() == 1) { + reportSessionOpen(); + } } else if (inMsg instanceof SessionCloseMsg) { logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId); sessions.remove(sessionId); attributeSubscriptions.remove(sessionId); rpcSubscriptions.remove(sessionId); + if (sessions.isEmpty()) { + reportSessionClose(); + } } } diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java index 0be0385365..e15eb2f3dd 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java @@ -32,4 +32,5 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId); void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType); + } diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 79d8280345..a35e5dcc3c 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -63,6 +63,7 @@ import org.thingsboard.server.exception.ThingsboardErrorResponseHandler; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.security.model.SecurityUser; +import org.thingsboard.server.service.state.DeviceStateService; import javax.mail.MessagingException; import javax.servlet.http.HttpServletRequest; @@ -137,6 +138,9 @@ public abstract class BaseController { @Autowired protected DeviceOfflineService offlineService; + @Autowired + protected DeviceStateService deviceStateService; + @ExceptionHandler(ThingsboardException.class) public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) { errorResponseHandler.handle(ex, response); diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index f97603e40f..5322b2c017 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -90,6 +90,11 @@ public class DeviceController extends BaseController { savedDevice.getCustomerId(), device.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null); + if (device.getId() == null) { + deviceStateService.onDeviceAdded(savedDevice); + } else { + deviceStateService.onDeviceUpdated(savedDevice); + } return savedDevice; } catch (Exception e) { logEntityAction(emptyId(EntityType.DEVICE), device, @@ -112,6 +117,7 @@ public class DeviceController extends BaseController { device.getCustomerId(), ActionType.DELETED, null, strDeviceId); + deviceStateService.onDeviceDeleted(device); } catch (Exception e) { logEntityAction(emptyId(EntityType.DEVICE), null, @@ -387,7 +393,7 @@ public class DeviceController extends BaseController { @RequestMapping(value = "/device/online", method = RequestMethod.GET) @ResponseBody public List getOnlineDevices(@RequestParam("contactType") DeviceStatusQuery.ContactType contactType, - @RequestParam("threshold") long threshold) throws ThingsboardException { + @RequestParam("threshold") long threshold) throws ThingsboardException { try { TenantId tenantId = getCurrentUser().getTenantId(); ListenableFuture> offlineDevices = offlineService.findOnlineDevices(tenantId.getId(), contactType, threshold); diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java new file mode 100644 index 0000000000..fc17ccd416 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -0,0 +1,390 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.state; + +import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.thingsboard.server.actors.service.ActorService; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.page.TextPageLink; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; +import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; + +import javax.annotation.Nullable; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.thingsboard.server.common.data.DataConstants.ACTIVITY_EVENT; +import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT; +import static org.thingsboard.server.common.data.DataConstants.DISCONNECT_EVENT; +import static org.thingsboard.server.common.data.DataConstants.INACTIVITY_EVENT; + +/** + * Created by ashvayka on 01.05.18. + */ +@Service +@Slf4j +//TODO: refactor to use page links as cursor and not fetch all +public class DefaultDeviceStateService implements DeviceStateService { + + private static final ObjectMapper json = new ObjectMapper(); + public static final String ACTIVITY_STATE = "active"; + public static final String LAST_CONNECT_TIME = "lastConnectTime"; + public static final String LAST_DISCONNECT_TIME = "lastDisconnectTime"; + public static final String LAST_ACTIVITY_TIME = "lastActivityTime"; + public static final String INACTIVITY_ALARM_TIME = "inactivityAlarmTime"; + public static final String INACTIVITY_TIMEOUT = "inactivityTimeout"; + + public static final List PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT); + + @Autowired + private TenantService tenantService; + + @Autowired + private DeviceService deviceService; + + @Autowired + private AttributesService attributesService; + + @Autowired + private ActorService actorService; + + @Autowired + private TelemetrySubscriptionService tsSubService; + + @Value("${state.defaultInactivityTimeoutInSec}") + @Getter + private long defaultInactivityTimeoutInSec; + + @Value("${state.defaultStateCheckIntervalInSec}") + @Getter + private long defaultStateCheckIntervalInSec; + +// TODO in v2.1 +// @Value("${state.defaultStatePersistenceIntervalInSec}") +// @Getter +// private long defaultStatePersistenceIntervalInSec; +// +// @Value("${state.defaultStatePersistencePack}") +// @Getter +// private long defaultStatePersistencePack; + + private ListeningScheduledExecutorService queueExecutor; + + private ConcurrentMap> tenantDevices = new ConcurrentHashMap<>(); + private ConcurrentMap deviceStates = new ConcurrentHashMap<>(); + + @PostConstruct + public void init() { + // Should be always single threaded due to absence of locks. + queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor()); + queueExecutor.submit(this::initStateFromDB); + queueExecutor.scheduleAtFixedRate(this::updateState, defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS); + //TODO: schedule persistence in v2.1; + } + + @PreDestroy + public void stop() { + if (queueExecutor != null) { + queueExecutor.shutdownNow(); + } + } + + @Override + public void onDeviceAdded(Device device) { + queueExecutor.submit(() -> onDeviceAddedSync(device)); + } + + @Override + public void onDeviceUpdated(Device device) { + queueExecutor.submit(() -> onDeviceUpdatedSync(device)); + } + + @Override + public void onDeviceConnect(DeviceId deviceId) { + queueExecutor.submit(() -> onDeviceConnectSync(deviceId)); + } + + @Override + public void onDeviceActivity(DeviceId deviceId) { + queueExecutor.submit(() -> onDeviceActivitySync(deviceId)); + } + + @Override + public void onDeviceDisconnect(DeviceId deviceId) { + queueExecutor.submit(() -> onDeviceDisconnectSync(deviceId)); + } + + @Override + public void onDeviceDeleted(Device device) { + queueExecutor.submit(() -> onDeviceDeleted(device.getTenantId(), device.getId())); + } + + @Override + public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) { + queueExecutor.submit(() -> onInactivityTimeoutUpdate(deviceId, inactivityTimeout)); + } + + @Override + public Optional getDeviceState(DeviceId deviceId) { + DeviceStateData state = deviceStates.get(deviceId); + if (state != null) { + return Optional.of(state.getState()); + } else { + return Optional.empty(); + } + } + + private void initStateFromDB() { + List tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData(); + for (Tenant tenant : tenants) { + List> fetchFutures = new ArrayList<>(); + List devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData(); + for (Device device : devices) { + fetchFutures.add(fetchDeviceState(device)); + } + try { + Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState); + } catch (InterruptedException | ExecutionException e) { + log.warn("Failed to init device state service from DB", e); + } + } + } + + private void addDeviceUsingState(DeviceStateData state) { + tenantDevices.computeIfAbsent(state.getTenantId(), id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId()); + deviceStates.put(state.getDeviceId(), state); + } + + private void updateState() { + long ts = System.currentTimeMillis(); + Set deviceIds = new HashSet<>(deviceStates.keySet()); + for (DeviceId deviceId : deviceIds) { + DeviceStateData stateData = deviceStates.get(deviceId); + DeviceState state = stateData.getState(); + state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout()); + if (!state.isActive() && state.getLastInactivityAlarmTime() < state.getLastActivityTime()) { + state.setLastInactivityAlarmTime(ts); + pushRuleEngineMessage(stateData, INACTIVITY_EVENT); + saveAttribute(deviceId, INACTIVITY_ALARM_TIME, ts); + saveAttribute(deviceId, ACTIVITY_STATE, state.isActive()); + } + } + } + + private void onDeviceConnectSync(DeviceId deviceId) { + DeviceStateData stateData = deviceStates.get(deviceId); + if (stateData != null) { + long ts = System.currentTimeMillis(); + stateData.getState().setLastConnectTime(ts); + pushRuleEngineMessage(stateData, CONNECT_EVENT); + saveAttribute(deviceId, LAST_CONNECT_TIME, ts); + } + } + + private void onDeviceDisconnectSync(DeviceId deviceId) { + DeviceStateData stateData = deviceStates.get(deviceId); + if (stateData != null) { + long ts = System.currentTimeMillis(); + stateData.getState().setLastDisconnectTime(ts); + pushRuleEngineMessage(stateData, DISCONNECT_EVENT); + saveAttribute(deviceId, LAST_DISCONNECT_TIME, ts); + } + } + + private void onDeviceActivitySync(DeviceId deviceId) { + DeviceStateData stateData = deviceStates.get(deviceId); + if (stateData != null) { + DeviceState state = stateData.getState(); + long ts = System.currentTimeMillis(); + state.setActive(true); + stateData.getState().setLastActivityTime(ts); + pushRuleEngineMessage(stateData, ACTIVITY_EVENT); + saveAttribute(deviceId, LAST_ACTIVITY_TIME, ts); + saveAttribute(deviceId, ACTIVITY_STATE, state.isActive()); + } + } + + private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) { + if (inactivityTimeout == 0L) { + return; + } + DeviceStateData stateData = deviceStates.get(deviceId); + if (stateData != null) { + long ts = System.currentTimeMillis(); + DeviceState state = stateData.getState(); + state.setInactivityTimeout(inactivityTimeout); + boolean oldActive = state.isActive(); + state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout()); + if (!oldActive && state.isActive()) { + saveAttribute(deviceId, ACTIVITY_STATE, state.isActive()); + } + } + } + + private void onDeviceAddedSync(Device device) { + Futures.addCallback(fetchDeviceState(device), new FutureCallback() { + @Override + public void onSuccess(@Nullable DeviceStateData state) { + addDeviceUsingState(state); + } + + @Override + public void onFailure(Throwable t) { + log.warn("Failed to register device to the state service", t); + } + }); + } + + private void onDeviceUpdatedSync(Device device) { + DeviceStateData stateData = deviceStates.get(device.getId()); + if (stateData != null) { + TbMsgMetaData md = new TbMsgMetaData(); + md.putValue("deviceName", device.getName()); + md.putValue("deviceType", device.getType()); + stateData.setMetaData(md); + } + } + + private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) { + deviceStates.remove(deviceId); + Set deviceIds = tenantDevices.get(tenantId); + if (deviceIds != null) { + deviceIds.remove(deviceId); + if (deviceIds.isEmpty()) { + tenantDevices.remove(tenantId); + } + } + } + + private ListenableFuture fetchDeviceState(Device device) { + ListenableFuture> attributes = attributesService.find(device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES); + return Futures.transform(attributes, new Function, DeviceStateData>() { + @Nullable + @Override + public DeviceStateData apply(@Nullable List attributes) { + long lastActivityTime = getAttributeValue(attributes, LAST_ACTIVITY_TIME, 0L); + long inactivityAlarmTime = getAttributeValue(attributes, INACTIVITY_ALARM_TIME, 0L); + long inactivityTimeout = getAttributeValue(attributes, INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec)); + boolean active = System.currentTimeMillis() < lastActivityTime + inactivityTimeout; + DeviceState deviceState = DeviceState.builder() + .active(active) + .lastConnectTime(getAttributeValue(attributes, LAST_CONNECT_TIME, 0L)) + .lastDisconnectTime(getAttributeValue(attributes, LAST_DISCONNECT_TIME, 0L)) + .lastActivityTime(lastActivityTime) + .lastInactivityAlarmTime(inactivityAlarmTime) + .inactivityTimeout(inactivityTimeout) + .build(); + TbMsgMetaData md = new TbMsgMetaData(); + md.putValue("deviceName", device.getName()); + md.putValue("deviceType", device.getType()); + return DeviceStateData.builder() + .tenantId(device.getTenantId()) + .deviceId(device.getId()) + .metaData(md) + .state(deviceState).build(); + } + }); + } + + private long getLastPersistTime(List attributes) { + return attributes.stream().map(AttributeKvEntry::getLastUpdateTs).max(Long::compare).orElse(0L); + } + + private long getAttributeValue(List attributes, String attributeName, long defaultValue) { + for (AttributeKvEntry attribute : attributes) { + if (attribute.getKey().equals(attributeName)) { + return attribute.getLongValue().orElse(defaultValue); + } + } + return defaultValue; + } + + private void pushRuleEngineMessage(DeviceStateData stateData, String msgType) { + DeviceState state = stateData.getState(); + try { + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData(), TbMsgDataType.JSON + , json.writeValueAsString(state) + , null, null, 0L); + actorService.onMsg(new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg)); + } catch (Exception e) { + log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e); + } + } + + private void saveAttribute(DeviceId deviceId, String key, long value) { + tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value)); + } + + private void saveAttribute(DeviceId deviceId, String key, boolean value) { + tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value)); + } + + private class AttributeSaveCallback implements FutureCallback { + private final DeviceId deviceId; + private final String key; + private final Object value; + + AttributeSaveCallback(DeviceId deviceId, String key, Object value) { + this.deviceId = deviceId; + this.key = key; + this.value = value; + } + + @Override + public void onSuccess(@Nullable Void result) { + log.trace("[{}] Successfully updated attribute [{}] with value [{}]", deviceId, key, value); + } + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] Failed to update attribute [{}] with value [{}]", deviceId, key, value, t); + } + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceState.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceState.java new file mode 100644 index 0000000000..29167ee519 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceState.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.state; + +import lombok.Builder; +import lombok.Data; + +/** + * Created by ashvayka on 01.05.18. + */ +@Data +@Builder +public class DeviceState { + + private boolean active; + private long lastConnectTime; + private long lastActivityTime; + private long lastDisconnectTime; + private long lastInactivityAlarmTime; + private long inactivityTimeout; + +} diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateData.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateData.java new file mode 100644 index 0000000000..cb9eeeaea7 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateData.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.state; + +import lombok.Builder; +import lombok.Data; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +/** + * Created by ashvayka on 01.05.18. + */ +@Data +@Builder +class DeviceStateData { + + private final TenantId tenantId; + private final DeviceId deviceId; + + private TbMsgMetaData metaData; + private final DeviceState state; + +} diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java new file mode 100644 index 0000000000..37f785cf44 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java @@ -0,0 +1,44 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.state; + +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.id.DeviceId; + +import java.util.Optional; + +/** + * Created by ashvayka on 01.05.18. + */ +public interface DeviceStateService { + + void onDeviceAdded(Device device); + + void onDeviceUpdated(Device device); + + void onDeviceDeleted(Device device); + + void onDeviceConnect(DeviceId deviceId); + + void onDeviceActivity(DeviceId deviceId); + + void onDeviceDisconnect(DeviceId deviceId); + + void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout); + + Optional getDeviceState(DeviceId deviceId); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 58bbec550e..00a337a38b 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -20,11 +20,20 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.dao.attributes.AttributesService; @@ -34,11 +43,14 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription; import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState; import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate; import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; +import org.thingsboard.server.service.state.DefaultDeviceStateService; +import org.thingsboard.server.service.state.DeviceStateService; import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -70,6 +82,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @Autowired private ClusterRoutingService routingService; + @Autowired + @Lazy + private DeviceStateService stateService; + private ExecutorService tsCallBackExecutor; private ExecutorService wsCallBackExecutor; @@ -149,10 +165,41 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio addWsCallback(saveFuture, success -> onAttributesUpdate(entityId, scope, attributes)); } + @Override + public void saveAttrAndNotify(EntityId entityId, String scope, String key, long value, FutureCallback callback) { + saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value) + , System.currentTimeMillis())), callback); + } + + @Override + public void saveAttrAndNotify(EntityId entityId, String scope, String key, String value, FutureCallback callback) { + saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(key, value) + , System.currentTimeMillis())), callback); + } + + @Override + public void saveAttrAndNotify(EntityId entityId, String scope, String key, double value, FutureCallback callback) { + saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new DoubleDataEntry(key, value) + , System.currentTimeMillis())), callback); + } + + @Override + public void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback callback) { + saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value) + , System.currentTimeMillis())), callback); + } + private void onAttributesUpdate(EntityId entityId, String scope, List attributes) { Optional serverAddress = routingService.resolveById(entityId); if (!serverAddress.isPresent()) { onLocalAttributesUpdate(entityId, scope, attributes); + if (entityId.getEntityType() == EntityType.DEVICE && DataConstants.SERVER_SCOPE.equalsIgnoreCase(scope)) { + for (AttributeKvEntry attribute : attributes) { + if (attribute.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) { + stateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L)); + } + } + } } else { // rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), entityId, entries); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 5d232d203c..af5b8d4de2 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -359,4 +359,11 @@ audit_log: host: "${AUDIT_LOG_SINK_HOST:localhost}" port: "${AUDIT_LOG_SINK_POST:9200}" user_name: "${AUDIT_LOG_SINK_USER_NAME:}" - password: "${AUDIT_LOG_SINK_PASSWORD:}" \ No newline at end of file + password: "${AUDIT_LOG_SINK_PASSWORD:}" + +state: + defaultInactivityTimeoutInSec: 10 + defaultStateCheckIntervalInSec: 10 +# TODO in v2.1 +# defaultStatePersistenceIntervalInSec: 60 +# defaultStatePersistencePack: 100 \ No newline at end of file diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 7d4e4807c9..f4e95596e8 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -45,4 +45,9 @@ public class DataConstants { public static final String IN = "IN"; public static final String OUT = "OUT"; + public static final String INACTIVITY_EVENT = "INACTIVITY_EVENT"; + public static final String CONNECT_EVENT = "CONNECT_EVENT"; + public static final String DISCONNECT_EVENT = "DISCONNECT_EVENT"; + public static final String ACTIVITY_EVENT = "ACTIVITY_EVENT"; + } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index 1ba18cd52f..aa57a027b8 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -33,4 +33,12 @@ public interface RuleEngineTelemetryService { void saveAndNotify(EntityId entityId, String scope, List attributes, FutureCallback callback); + void saveAttrAndNotify(EntityId entityId, String scope, String key, long value, FutureCallback callback); + + void saveAttrAndNotify(EntityId entityId, String scope, String key, String value, FutureCallback callback); + + void saveAttrAndNotify(EntityId entityId, String scope, String key, double value, FutureCallback callback); + + void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback callback); + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java index 37c6443075..75329a031c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.filter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.api.*; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.session.SessionMsgType; @@ -27,7 +28,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; type = ComponentType.FILTER, name = "message type switch", configClazz = EmptyNodeConfiguration.class, - relationTypes = {"Post attributes", "Post telemetry", "RPC Request", "Other"}, + relationTypes = {"Post attributes", "Post telemetry", "RPC Request", "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Other"}, nodeDescription = "Route incoming messages by Message Type", nodeDetails = "Sends messages with message types \"Post attributes\", \"Post telemetry\", \"RPC Request\" via corresponding chain, otherwise Other chain is used.", uiResources = {"static/rulenode/rulenode-core-config.js"}, @@ -50,7 +51,15 @@ public class TbMsgTypeSwitchNode implements TbNode { relationType = "Post telemetry"; } else if (msg.getType().equals(SessionMsgType.TO_SERVER_RPC_REQUEST.name())) { relationType = "RPC Request"; - } else { + } else if (msg.getType().equals(DataConstants.ACTIVITY_EVENT)) { + relationType = "Activity Event"; + } else if (msg.getType().equals(DataConstants.INACTIVITY_EVENT)) { + relationType = "Inactivity Event"; + } else if (msg.getType().equals(DataConstants.CONNECT_EVENT)) { + relationType = "Connect Event"; + } else if (msg.getType().equals(DataConstants.DISCONNECT_EVENT)) { + relationType = "Disconnect Event"; + } else { relationType = "Other"; } ctx.tellNext(msg, relationType);