Merge pull request #16 from thingsboard/feature/credentials

Credentials revocation
This commit is contained in:
Andrew Shvayka 2016-12-20 13:12:56 +02:00 committed by GitHub
commit b04bea6758
16 changed files with 213 additions and 40 deletions

View File

@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.*;
@ -58,6 +59,8 @@ public class DeviceActor extends ContextAwareActor {
processor.processAttributesUpdate(context(), (DeviceAttributesEventNotificationMsg) msg);
} else if (msg instanceof ToDeviceRpcRequestPluginMsg) {
processor.processRpcRequest(context(), (ToDeviceRpcRequestPluginMsg) msg);
} else if (msg instanceof DeviceCredentialsUpdateNotificationMsg){
processor.processCredentialsUpdate(context(), (DeviceCredentialsUpdateNotificationMsg) msg);
}
} else if (msg instanceof TimeoutMsg) {
processor.processTimeout(context(), (TimeoutMsg) msg);

View File

@ -32,13 +32,7 @@ import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;
import org.thingsboard.server.common.msg.core.BasicToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.SessionCloseMsg;
import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
@ -47,6 +41,7 @@ import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.extensions.api.device.DeviceAttributes;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
import org.thingsboard.server.extensions.api.plugins.msg.TimeoutIntMsg;
@ -74,6 +69,7 @@ import java.util.stream.Collectors;
public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
private final DeviceId deviceId;
private final Map<SessionId, SessionInfo> sessions;
private final Map<SessionId, SessionInfo> attributeSubscriptions;
private final Map<SessionId, SessionInfo> rpcSubscriptions;
@ -85,6 +81,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
public DeviceActorMessageProcessor(ActorSystemContext systemContext, LoggingAdapter logger, DeviceId deviceId) {
super(systemContext, logger);
this.deviceId = deviceId;
this.sessions = new HashMap<>();
this.attributeSubscriptions = new HashMap<>();
this.rpcSubscriptions = new HashMap<>();
this.rpcPendingMap = new HashMap<>();
@ -342,8 +339,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private void processSessionStateMsgs(ToDeviceActorMsg msg) {
SessionId sessionId = msg.getSessionId();
FromDeviceMsg inMsg = msg.getPayload();
if (inMsg instanceof SessionCloseMsg) {
if (inMsg instanceof SessionOpenMsg) {
logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress()));
} else if (inMsg instanceof SessionCloseMsg) {
logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
sessions.remove(sessionId);
attributeSubscriptions.remove(sessionId);
rpcSubscriptions.remove(sessionId);
}
@ -363,4 +364,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
return systemContext.getAttributesService().findAll(this.deviceId, attributeType);
}
public void processCredentialsUpdate(ActorContext context, DeviceCredentialsUpdateNotificationMsg msg) {
sessions.forEach((k, v) -> {
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
});
attributeSubscriptions.clear();
rpcSubscriptions.clear();
}
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.actors.service;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
@ -28,4 +29,6 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
void onPluginStateChange(TenantId tenantId, PluginId pluginId, ComponentLifecycleEvent state);
void onRuleStateChange(TenantId tenantId, RuleId ruleId, ComponentLifecycleEvent state);
void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
}

View File

@ -32,16 +32,19 @@ import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
import org.thingsboard.server.actors.session.SessionManagerActor;
import org.thingsboard.server.actors.stats.StatsActor;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
@ -56,6 +59,7 @@ import scala.concurrent.duration.Duration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Optional;
@Service
@Slf4j
@ -221,6 +225,17 @@ public class DefaultActorService implements ActorService {
broadcast(ComponentLifecycleMsg.forRule(tenantId, ruleId, state));
}
@Override
public void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId) {
DeviceCredentialsUpdateNotificationMsg msg = new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId);
Optional<ServerAddress> address = actorContext.getRoutingService().resolve(deviceId);
if (address.isPresent()) {
rpcService.tell(address.get(), msg);
} else {
onMsg(msg);
}
}
public void broadcast(ToAllNodesMsg msg) {
rpcService.broadcast(msg);
appActor.tell(msg, ActorRef.noSender());

View File

@ -20,15 +20,14 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.AttributesSubscribeMsg;
import org.thingsboard.server.common.msg.core.ResponseMsg;
import org.thingsboard.server.common.msg.core.RpcSubscribeMsg;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.core.SessionCloseMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.*;
import akka.actor.ActorContext;
import akka.event.LoggingAdapter;
import org.thingsboard.server.common.msg.session.ctrl.*;
import org.thingsboard.server.common.msg.session.ex.SessionException;
import java.util.HashMap;
@ -37,7 +36,8 @@ import java.util.Optional;
class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
Map<Integer, ToDeviceActorMsg> pendingMap = new HashMap<>();
private boolean firstMsg = true;
private Map<Integer, ToDeviceActorMsg> pendingMap = new HashMap<>();
private Optional<ServerAddress> currentTargetServer;
private boolean subscribedToAttributeUpdates;
private boolean subscribedToRpcCommands;
@ -49,6 +49,10 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
@Override
protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
updateSessionCtx(msg, SessionType.ASYNC);
if (firstMsg) {
toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
firstMsg = false;
}
ToDeviceActorMsg pendingMsg = toDeviceMsg(msg);
FromDeviceMsg fromDeviceMsg = pendingMsg.getPayload();
switch (fromDeviceMsg.getMsgType()) {
@ -80,6 +84,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
@Override
public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) {
try {
if (msg.getMsgType() != MsgType.SESSION_CLOSE) {
switch (msg.getMsgType()) {
case STATUS_CODE_RESPONSE:
case GET_ATTRIBUTES_RESPONSE:
@ -91,6 +96,9 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
break;
}
sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg));
} else {
sessionCtx.onMsg(org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg.onCredentialsRevoked(sessionCtx.getSessionId()));
}
} catch (SessionException e) {
logger.warning("Failed to push session response msg", e);
}
@ -102,7 +110,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
}
protected void cleanupSession(ActorContext ctx) {
toDeviceMsg(new SessionCloseMsg()).ifPresent(msg -> forwardToAppActor(ctx, msg));
toDeviceMsg(new SessionCloseMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
}
@Override
@ -110,6 +118,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
if (pendingMap.size() > 0 || subscribedToAttributeUpdates || subscribedToRpcCommands) {
Optional<ServerAddress> newTargetServer = systemContext.getRoutingService().resolve(getDeviceId());
if (!newTargetServer.equals(currentTargetServer)) {
firstMsg = true;
currentTargetServer = newTargetServer;
pendingMap.values().forEach(v -> {
forwardToAppActor(context, v, currentTargetServer);

View File

@ -52,7 +52,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
public void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg) {
if (pendingResponse) {
try {
sessionCtx.onMsg(new SessionCloseMsg(sessionId, true));
sessionCtx.onMsg(SessionCloseMsg.onTimeout(sessionId));
} catch (SessionException e) {
logger.warning("Failed to push session close msg", e);
}

View File

@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.exception.ThingsboardException;
import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
@RestController
@RequestMapping("/api")
@ -130,14 +131,16 @@ public class DeviceController extends BaseController {
checkNotNull(deviceCredentials);
try {
checkDeviceId(deviceCredentials.getDeviceId());
return checkNotNull(deviceCredentialsService.updateDeviceCredentials(deviceCredentials));
DeviceCredentials result = checkNotNull(deviceCredentialsService.updateDeviceCredentials(deviceCredentials));
actorService.onCredentialsUpdate(getCurrentUser().getTenantId(), deviceCredentials.getDeviceId());
return result;
} catch (Exception e) {
throw handleException(e);
}
}
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/tenant/devices", params = { "limit" }, method = RequestMethod.GET)
@RequestMapping(value = "/tenant/devices", params = {"limit"}, method = RequestMethod.GET)
@ResponseBody
public TextPageData<Device> getTenantDevices(
@RequestParam int limit,
@ -154,7 +157,7 @@ public class DeviceController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/customer/{customerId}/devices", params = { "limit" }, method = RequestMethod.GET)
@RequestMapping(value = "/customer/{customerId}/devices", params = {"limit"}, method = RequestMethod.GET)
@ResponseBody
public TextPageData<Device> getCustomerDevices(
@PathVariable("customerId") String strCustomerId,

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.core;
import lombok.ToString;
import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
import org.thingsboard.server.common.msg.session.MsgType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
@ToString
public class SessionCloseNotification implements ToDeviceMsg {
private static final long serialVersionUID = 1L;
@Override
public boolean isSuccess() {
return true;
}
@Override
public MsgType getMsgType() {
return MsgType.SESSION_CLOSE;
}
}

View File

@ -0,0 +1,29 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.core;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.MsgType;
/**
* @author Andrew Shvayka
*/
public class SessionOpenMsg implements FromDeviceMsg {
@Override
public MsgType getMsgType() {
return MsgType.SESSION_OPEN;
}
}

View File

@ -28,7 +28,7 @@ public enum MsgType {
RULE_ENGINE_ERROR,
SESSION_CLOSE;
SESSION_OPEN, SESSION_CLOSE;
private final boolean requiresRulesProcessing;

View File

@ -21,11 +21,25 @@ import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
public class SessionCloseMsg implements SessionCtrlMsg {
private final SessionId sessionId;
private final boolean revoked;
private final boolean timeout;
public SessionCloseMsg(SessionId sessionId, boolean timeout) {
public static SessionCloseMsg onError(SessionId sessionId) {
return new SessionCloseMsg(sessionId, false, false);
}
public static SessionCloseMsg onTimeout(SessionId sessionId) {
return new SessionCloseMsg(sessionId, false, true);
}
public static SessionCloseMsg onCredentialsRevoked(SessionId sessionId) {
return new SessionCloseMsg(sessionId, true, false);
}
private SessionCloseMsg(SessionId sessionId, boolean unauthorized, boolean timeout) {
super();
this.sessionId = sessionId;
this.revoked = unauthorized;
this.timeout = timeout;
}
@ -34,6 +48,10 @@ public class SessionCloseMsg implements SessionCtrlMsg {
return sessionId;
}
public boolean isCredentialsRevoked() {
return revoked;
}
public boolean isTimeout() {
return timeout;
}

View File

@ -0,0 +1,36 @@
/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.api.device;
import lombok.Data;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKey;
import java.util.Set;
/**
* @author Andrew Shvayka
*/
@Data
public class DeviceCredentialsUpdateNotificationMsg implements ToDeviceActorNotificationMsg {
private final TenantId tenantId;
private final DeviceId deviceId;
}

View File

@ -0,0 +1,5 @@
restUrl=http://localhost:8080
mqttUrls=tcp://localhost:1883
deviceCount=1
durationMs=60000
iterationIntervalMs=1000

View File

@ -36,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class CoapSessionCtx extends DeviceAwareSessionContext {
@ -87,6 +88,8 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
private void onSessionClose(SessionCloseMsg msg) {
if (msg.isTimeout()) {
exchange.respond(ResponseCode.SERVICE_UNAVAILABLE);
} else if (msg.isCredentialsRevoked()) {
exchange.respond(ResponseCode.UNAUTHORIZED);
} else {
exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR);
}
@ -120,7 +123,7 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
public void close() {
log.info("[{}] Closing processing context. Timeout: {}", sessionId, exchange.advanced().isTimedOut());
processor.process(new SessionCloseMsg(sessionId, exchange.advanced().isTimedOut()));
processor.process(exchange.advanced().isTimedOut() ? SessionCloseMsg.onTimeout(sessionId) : SessionCloseMsg.onError(sessionId));
}
@Override

View File

@ -210,7 +210,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processDisconnect(ChannelHandlerContext ctx) {
processor.process(new SessionCloseMsg(sessionCtx.getSessionId(), false));
ctx.close();
}
@ -255,6 +254,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
processor.process(new SessionCloseMsg(sessionCtx.getSessionId(), false));
processor.process(SessionCloseMsg.onError(sessionCtx.getSessionId()));
}
}

View File

@ -16,12 +16,13 @@
package org.thingsboard.server.transport.mqtt.session;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.msg.session.ex.SessionException;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@ -75,7 +76,10 @@ public class MqttSessionCtx extends DeviceAwareSessionContext {
@Override
public void onMsg(SessionCtrlMsg msg) throws SessionException {
if (msg instanceof SessionCloseMsg) {
pushToNetwork(new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)));
channel.close();
}
}
@Override