From 58ec7a335af732810cad08cca96e109a44a7d6a2 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Tue, 20 Dec 2016 11:50:52 +0200 Subject: [PATCH 1/2] Credentials revocation --- .../server/actors/device/DeviceActor.java | 3 ++ .../device/DeviceActorMessageProcessor.java | 26 ++++++++----- .../server/actors/service/ActorService.java | 3 ++ .../actors/service/DefaultActorService.java | 15 +++++++ .../actors/session/ASyncMsgProcessor.java | 39 ++++++++++++------- .../actors/session/SyncMsgProcessor.java | 2 +- .../server/controller/DeviceController.java | 19 +++++---- .../src/main/resources/thingsboard.yml | 2 +- .../msg/core/SessionCloseNotification.java | 38 ++++++++++++++++++ .../common/msg/core/SessionOpenMsg.java | 29 ++++++++++++++ .../server/common/msg/session/MsgType.java | 2 +- .../msg/session/ctrl/SessionCloseMsg.java | 20 +++++++++- ...eviceCredentialsUpdateNotificationMsg.java | 36 +++++++++++++++++ tools/src/main/resources/test.properties | 5 +++ .../coap/session/CoapSessionCtx.java | 5 ++- .../transport/mqtt/MqttTransportHandler.java | 3 +- .../mqtt/session/MqttSessionCtx.java | 8 +++- 17 files changed, 214 insertions(+), 41 deletions(-) create mode 100644 common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionCloseNotification.java create mode 100644 common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java create mode 100644 extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceCredentialsUpdateNotificationMsg.java create mode 100644 tools/src/main/resources/test.properties diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java index 8b669e9060..bf18df659a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg; import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg; import org.thingsboard.server.extensions.api.plugins.msg.*; @@ -58,6 +59,8 @@ public class DeviceActor extends ContextAwareActor { processor.processAttributesUpdate(context(), (DeviceAttributesEventNotificationMsg) msg); } else if (msg instanceof ToDeviceRpcRequestPluginMsg) { processor.processRpcRequest(context(), (ToDeviceRpcRequestPluginMsg) msg); + } else if (msg instanceof DeviceCredentialsUpdateNotificationMsg){ + processor.processCredentialsUpdate(context(), (DeviceCredentialsUpdateNotificationMsg) msg); } } else if (msg instanceof TimeoutMsg) { processor.processTimeout(context(), (TimeoutMsg) msg); diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 3949691938..3aef0c8c6d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -32,13 +32,7 @@ import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.core.AttributesUpdateNotification; -import org.thingsboard.server.common.msg.core.BasicCommandAckResponse; -import org.thingsboard.server.common.msg.core.BasicToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.core.SessionCloseMsg; -import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; -import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg; -import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg; +import org.thingsboard.server.common.msg.core.*; import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg; import org.thingsboard.server.common.msg.session.FromDeviceMsg; @@ -47,6 +41,7 @@ import org.thingsboard.server.common.msg.session.SessionType; import org.thingsboard.server.common.msg.session.ToDeviceMsg; import org.thingsboard.server.extensions.api.device.DeviceAttributes; import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg; import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; import org.thingsboard.server.extensions.api.plugins.msg.RpcError; import org.thingsboard.server.extensions.api.plugins.msg.TimeoutIntMsg; @@ -74,6 +69,7 @@ import java.util.stream.Collectors; public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { private final DeviceId deviceId; + private final Map sessions; private final Map attributeSubscriptions; private final Map rpcSubscriptions; @@ -85,6 +81,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso public DeviceActorMessageProcessor(ActorSystemContext systemContext, LoggingAdapter logger, DeviceId deviceId) { super(systemContext, logger); this.deviceId = deviceId; + this.sessions = new HashMap<>(); this.attributeSubscriptions = new HashMap<>(); this.rpcSubscriptions = new HashMap<>(); this.rpcPendingMap = new HashMap<>(); @@ -281,7 +278,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (!msg.isAdded()) { logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress()); Predicate> filter = e -> e.getValue().getServer() - .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false); + .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false); attributeSubscriptions.entrySet().removeIf(filter); rpcSubscriptions.entrySet().removeIf(filter); } @@ -342,8 +339,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private void processSessionStateMsgs(ToDeviceActorMsg msg) { SessionId sessionId = msg.getSessionId(); FromDeviceMsg inMsg = msg.getPayload(); - if (inMsg instanceof SessionCloseMsg) { + if (inMsg instanceof SessionOpenMsg) { + logger.debug("[{}] Processing new session [{}]", deviceId, sessionId); + sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress())); + } else if (inMsg instanceof SessionCloseMsg) { logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId); + sessions.remove(sessionId); attributeSubscriptions.remove(sessionId); rpcSubscriptions.remove(sessionId); } @@ -363,4 +364,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso return systemContext.getAttributesService().findAll(this.deviceId, attributeType); } + public void processCredentialsUpdate(ActorContext context, DeviceCredentialsUpdateNotificationMsg msg) { + sessions.forEach((k, v) -> { + sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer()); + }); + attributeSubscriptions.clear(); + rpcSubscriptions.clear(); + } } diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java index 1c64f546c1..3db72106cf 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.actors.service; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.PluginId; import org.thingsboard.server.common.data.id.RuleId; import org.thingsboard.server.common.data.id.TenantId; @@ -28,4 +29,6 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor void onPluginStateChange(TenantId tenantId, PluginId pluginId, ComponentLifecycleEvent state); void onRuleStateChange(TenantId tenantId, RuleId ruleId, ComponentLifecycleEvent state); + + void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId); } diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index db6526d24e..bbf1300df4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -32,16 +32,19 @@ import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg; import org.thingsboard.server.actors.rpc.RpcSessionTellMsg; import org.thingsboard.server.actors.session.SessionManagerActor; import org.thingsboard.server.actors.stats.StatsActor; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.PluginId; import org.thingsboard.server.common.data.id.RuleId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.aware.SessionAwareMsg; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; +import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg; import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg; import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; +import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg; import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg; import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg; import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg; @@ -56,6 +59,7 @@ import scala.concurrent.duration.Duration; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.Optional; @Service @Slf4j @@ -221,6 +225,17 @@ public class DefaultActorService implements ActorService { broadcast(ComponentLifecycleMsg.forRule(tenantId, ruleId, state)); } + @Override + public void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId) { + DeviceCredentialsUpdateNotificationMsg msg = new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId); + Optional address = actorContext.getRoutingService().resolve(deviceId); + if (address.isPresent()) { + rpcService.tell(address.get(), msg); + } else { + onMsg(msg); + } + } + public void broadcast(ToAllNodesMsg msg) { rpcService.broadcast(msg); appActor.tell(msg, ActorRef.noSender()); diff --git a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java index 90b0cb316a..916e678ca7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java @@ -20,15 +20,14 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg; import org.thingsboard.server.common.data.id.SessionId; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.core.AttributesSubscribeMsg; -import org.thingsboard.server.common.msg.core.ResponseMsg; -import org.thingsboard.server.common.msg.core.RpcSubscribeMsg; +import org.thingsboard.server.common.msg.core.*; import org.thingsboard.server.common.msg.core.SessionCloseMsg; import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; import org.thingsboard.server.common.msg.session.*; import akka.actor.ActorContext; import akka.event.LoggingAdapter; +import org.thingsboard.server.common.msg.session.ctrl.*; import org.thingsboard.server.common.msg.session.ex.SessionException; import java.util.HashMap; @@ -37,7 +36,8 @@ import java.util.Optional; class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor { - Map pendingMap = new HashMap<>(); + private boolean firstMsg = true; + private Map pendingMap = new HashMap<>(); private Optional currentTargetServer; private boolean subscribedToAttributeUpdates; private boolean subscribedToRpcCommands; @@ -49,6 +49,10 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor { @Override protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) { updateSessionCtx(msg, SessionType.ASYNC); + if (firstMsg) { + toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m)); + firstMsg = false; + } ToDeviceActorMsg pendingMsg = toDeviceMsg(msg); FromDeviceMsg fromDeviceMsg = pendingMsg.getPayload(); switch (fromDeviceMsg.getMsgType()) { @@ -80,17 +84,21 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor { @Override public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) { try { - switch (msg.getMsgType()) { - case STATUS_CODE_RESPONSE: - case GET_ATTRIBUTES_RESPONSE: - ResponseMsg responseMsg = (ResponseMsg) msg; - if (responseMsg.getRequestId() >= 0) { - logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg); - pendingMap.remove(responseMsg.getRequestId()); - } - break; + if (msg.getMsgType() != MsgType.SESSION_CLOSE) { + switch (msg.getMsgType()) { + case STATUS_CODE_RESPONSE: + case GET_ATTRIBUTES_RESPONSE: + ResponseMsg responseMsg = (ResponseMsg) msg; + if (responseMsg.getRequestId() >= 0) { + logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg); + pendingMap.remove(responseMsg.getRequestId()); + } + break; + } + sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg)); + } else { + sessionCtx.onMsg(org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg.onCredentialsRevoked(sessionCtx.getSessionId())); } - sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg)); } catch (SessionException e) { logger.warning("Failed to push session response msg", e); } @@ -102,7 +110,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor { } protected void cleanupSession(ActorContext ctx) { - toDeviceMsg(new SessionCloseMsg()).ifPresent(msg -> forwardToAppActor(ctx, msg)); + toDeviceMsg(new SessionCloseMsg()).ifPresent(m -> forwardToAppActor(ctx, m)); } @Override @@ -110,6 +118,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor { if (pendingMap.size() > 0 || subscribedToAttributeUpdates || subscribedToRpcCommands) { Optional newTargetServer = systemContext.getRoutingService().resolve(getDeviceId()); if (!newTargetServer.equals(currentTargetServer)) { + firstMsg = true; currentTargetServer = newTargetServer; pendingMap.values().forEach(v -> { forwardToAppActor(context, v, currentTargetServer); diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java index afb35ac76d..9fb13d36ce 100644 --- a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java @@ -52,7 +52,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor { public void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg) { if (pendingResponse) { try { - sessionCtx.onMsg(new SessionCloseMsg(sessionId, true)); + sessionCtx.onMsg(SessionCloseMsg.onTimeout(sessionId)); } catch (SessionException e) { logger.warning("Failed to push session close msg", e); } diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index 1c0a7beba5..43416ce6d6 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.exception.ThingsboardException; +import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg; @RestController @RequestMapping("/api") @@ -48,7 +49,7 @@ public class DeviceController extends BaseController { @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/device", method = RequestMethod.POST) - @ResponseBody + @ResponseBody public Device saveDevice(@RequestBody Device device) throws ThingsboardException { try { device.setTenantId(getCurrentUser().getTenantId()); @@ -74,7 +75,7 @@ public class DeviceController extends BaseController { @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/customer/{customerId}/device/{deviceId}", method = RequestMethod.POST) - @ResponseBody + @ResponseBody public Device assignDeviceToCustomer(@PathVariable("customerId") String strCustomerId, @PathVariable("deviceId") String strDeviceId) throws ThingsboardException { checkParameter("customerId", strCustomerId); @@ -85,7 +86,7 @@ public class DeviceController extends BaseController { DeviceId deviceId = new DeviceId(toUUID(strDeviceId)); checkDeviceId(deviceId); - + return checkNotNull(deviceService.assignDeviceToCustomer(deviceId, customerId)); } catch (Exception e) { throw handleException(e); @@ -94,7 +95,7 @@ public class DeviceController extends BaseController { @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/customer/device/{deviceId}", method = RequestMethod.DELETE) - @ResponseBody + @ResponseBody public Device unassignDeviceFromCustomer(@PathVariable("deviceId") String strDeviceId) throws ThingsboardException { checkParameter("deviceId", strDeviceId); try { @@ -125,19 +126,21 @@ public class DeviceController extends BaseController { @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/device/credentials", method = RequestMethod.POST) - @ResponseBody + @ResponseBody public DeviceCredentials saveDeviceCredentials(@RequestBody DeviceCredentials deviceCredentials) throws ThingsboardException { checkNotNull(deviceCredentials); try { checkDeviceId(deviceCredentials.getDeviceId()); - return checkNotNull(deviceCredentialsService.updateDeviceCredentials(deviceCredentials)); + DeviceCredentials result = checkNotNull(deviceCredentialsService.updateDeviceCredentials(deviceCredentials)); + actorService.onCredentialsUpdate(getCurrentUser().getTenantId(), deviceCredentials.getDeviceId()); + return result; } catch (Exception e) { throw handleException(e); } } @PreAuthorize("hasAuthority('TENANT_ADMIN')") - @RequestMapping(value = "/tenant/devices", params = { "limit" }, method = RequestMethod.GET) + @RequestMapping(value = "/tenant/devices", params = {"limit"}, method = RequestMethod.GET) @ResponseBody public TextPageData getTenantDevices( @RequestParam int limit, @@ -154,7 +157,7 @@ public class DeviceController extends BaseController { } @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") - @RequestMapping(value = "/customer/{customerId}/devices", params = { "limit" }, method = RequestMethod.GET) + @RequestMapping(value = "/customer/{customerId}/devices", params = {"limit"}, method = RequestMethod.GET) @ResponseBody public TextPageData getCustomerDevices( @PathVariable("customerId") String strCustomerId, diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index aa7fcc10c9..0695413f6b 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -29,7 +29,7 @@ server: # Zookeeper connection parameters. Used for service discovery. zk: # Enable/disable zookeeper discovery service. - enabled: "${ZOOKEEPER_ENABLED:false}" + enabled: "${ZOOKEEPER_ENABLED:true}" # Zookeeper connect string url: "${ZOOKEEPER_URL:localhost:2181}" # Zookeeper retry interval in milliseconds diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionCloseNotification.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionCloseNotification.java new file mode 100644 index 0000000000..3e96e40145 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionCloseNotification.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.core; + +import lombok.ToString; +import org.thingsboard.server.common.msg.kv.AttributesKVMsg; +import org.thingsboard.server.common.msg.session.MsgType; +import org.thingsboard.server.common.msg.session.ToDeviceMsg; + +@ToString +public class SessionCloseNotification implements ToDeviceMsg { + + private static final long serialVersionUID = 1L; + + @Override + public boolean isSuccess() { + return true; + } + + @Override + public MsgType getMsgType() { + return MsgType.SESSION_CLOSE; + } + +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java new file mode 100644 index 0000000000..d18dc9f871 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.core; + +import org.thingsboard.server.common.msg.session.FromDeviceMsg; +import org.thingsboard.server.common.msg.session.MsgType; + +/** + * @author Andrew Shvayka + */ +public class SessionOpenMsg implements FromDeviceMsg { + @Override + public MsgType getMsgType() { + return MsgType.SESSION_OPEN; + } +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/MsgType.java index 1b91425462..549a143c6b 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/MsgType.java @@ -28,7 +28,7 @@ public enum MsgType { RULE_ENGINE_ERROR, - SESSION_CLOSE; + SESSION_OPEN, SESSION_CLOSE; private final boolean requiresRulesProcessing; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java index d1885273cb..03b611e6a1 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java @@ -21,11 +21,25 @@ import org.thingsboard.server.common.msg.session.SessionCtrlMsg; public class SessionCloseMsg implements SessionCtrlMsg { private final SessionId sessionId; + private final boolean revoked; private final boolean timeout; - public SessionCloseMsg(SessionId sessionId, boolean timeout) { + public static SessionCloseMsg onError(SessionId sessionId) { + return new SessionCloseMsg(sessionId, false, false); + } + + public static SessionCloseMsg onTimeout(SessionId sessionId) { + return new SessionCloseMsg(sessionId, false, true); + } + + public static SessionCloseMsg onCredentialsRevoked(SessionId sessionId) { + return new SessionCloseMsg(sessionId, true, false); + } + + private SessionCloseMsg(SessionId sessionId, boolean unauthorized, boolean timeout) { super(); this.sessionId = sessionId; + this.revoked = unauthorized; this.timeout = timeout; } @@ -34,6 +48,10 @@ public class SessionCloseMsg implements SessionCtrlMsg { return sessionId; } + public boolean isCredentialsRevoked() { + return revoked; + } + public boolean isTimeout() { return timeout; } diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceCredentialsUpdateNotificationMsg.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceCredentialsUpdateNotificationMsg.java new file mode 100644 index 0000000000..0104824a9d --- /dev/null +++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceCredentialsUpdateNotificationMsg.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.api.device; + +import lombok.Data; +import lombok.Getter; +import lombok.ToString; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKey; + +import java.util.Set; + +/** + * @author Andrew Shvayka + */ +@Data +public class DeviceCredentialsUpdateNotificationMsg implements ToDeviceActorNotificationMsg { + + private final TenantId tenantId; + private final DeviceId deviceId; + +} diff --git a/tools/src/main/resources/test.properties b/tools/src/main/resources/test.properties new file mode 100644 index 0000000000..6e9ed89cf9 --- /dev/null +++ b/tools/src/main/resources/test.properties @@ -0,0 +1,5 @@ +restUrl=http://localhost:8080 +mqttUrls=tcp://localhost:1883 +deviceCount=1 +durationMs=60000 +iterationIntervalMs=1000 diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java index a61f0bf864..6f6e35a66c 100644 --- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java +++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicInteger; + @Slf4j public class CoapSessionCtx extends DeviceAwareSessionContext { @@ -87,6 +88,8 @@ public class CoapSessionCtx extends DeviceAwareSessionContext { private void onSessionClose(SessionCloseMsg msg) { if (msg.isTimeout()) { exchange.respond(ResponseCode.SERVICE_UNAVAILABLE); + } else if (msg.isCredentialsRevoked()) { + exchange.respond(ResponseCode.UNAUTHORIZED); } else { exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR); } @@ -120,7 +123,7 @@ public class CoapSessionCtx extends DeviceAwareSessionContext { public void close() { log.info("[{}] Closing processing context. Timeout: {}", sessionId, exchange.advanced().isTimedOut()); - processor.process(new SessionCloseMsg(sessionId, exchange.advanced().isTimedOut())); + processor.process(exchange.advanced().isTimedOut() ? SessionCloseMsg.onTimeout(sessionId) : SessionCloseMsg.onError(sessionId)); } @Override diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index e1bb45c872..b4d8108a94 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -210,7 +210,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private void processDisconnect(ChannelHandlerContext ctx) { - processor.process(new SessionCloseMsg(sessionCtx.getSessionId(), false)); ctx.close(); } @@ -255,6 +254,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void operationComplete(Future future) throws Exception { - processor.process(new SessionCloseMsg(sessionCtx.getSessionId(), false)); + processor.process(SessionCloseMsg.onError(sessionCtx.getSessionId())); } } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionCtx.java index 5cae9f5e8f..f653682da0 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionCtx.java @@ -16,12 +16,13 @@ package org.thingsboard.server.transport.mqtt.session; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.*; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.SessionId; import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg; import org.thingsboard.server.common.msg.session.SessionCtrlMsg; import org.thingsboard.server.common.msg.session.SessionType; +import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.msg.session.ex.SessionException; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; @@ -75,7 +76,10 @@ public class MqttSessionCtx extends DeviceAwareSessionContext { @Override public void onMsg(SessionCtrlMsg msg) throws SessionException { - + if (msg instanceof SessionCloseMsg) { + pushToNetwork(new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0))); + channel.close(); + } } @Override From f6595abbed13f0337cfb0b1743d51f8a7931586e Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Tue, 20 Dec 2016 12:52:48 +0200 Subject: [PATCH 2/2] Build Fix --- application/src/main/resources/thingsboard.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 0695413f6b..aa7fcc10c9 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -29,7 +29,7 @@ server: # Zookeeper connection parameters. Used for service discovery. zk: # Enable/disable zookeeper discovery service. - enabled: "${ZOOKEEPER_ENABLED:true}" + enabled: "${ZOOKEEPER_ENABLED:false}" # Zookeeper connect string url: "${ZOOKEEPER_URL:localhost:2181}" # Zookeeper retry interval in milliseconds