OTA Info store and client lookup

This commit is contained in:
Andrii Shvaika 2021-06-24 19:31:16 +03:00
parent 78f9194669
commit 1b391847d3
18 changed files with 416 additions and 136 deletions

View File

@ -181,6 +181,7 @@ public class DefaultLwM2MAttributesService implements LwM2MAttributesService {
} }
} }
}); });
clientContext.update(lwM2MClient);
// #2.1 // #2.1
lwM2MClient.getSharedAttributes().forEach((pathIdVer, tsKvProto) -> { lwM2MClient.getSharedAttributes().forEach((pathIdVer, tsKvProto) -> {
this.pushUpdateToClientIfNeeded(lwM2MClient, this.getResourceValueFormatKv(lwM2MClient, pathIdVer), this.pushUpdateToClientIfNeeded(lwM2MClient, this.getResourceValueFormatKv(lwM2MClient, pathIdVer),

View File

@ -142,6 +142,16 @@ public class LwM2mClient implements Serializable {
builder.setDeviceType(deviceProfile.getName()); builder.setDeviceType(deviceProfile.getName());
} }
public void refreshSessionId(String nodeId) {
UUID newId = UUID.randomUUID();
SessionInfoProto.Builder builder = SessionInfoProto.newBuilder().mergeFrom(session);
builder.setNodeId(nodeId);
builder.setSessionIdMSB(newId.getMostSignificantBits());
builder.setSessionIdLSB(newId.getLeastSignificantBits());
this.session = builder.build();
}
private SessionInfoProto createSession(String nodeId, UUID sessionId, ValidateDeviceCredentialsResponse msg) { private SessionInfoProto createSession(String nodeId, UUID sessionId, ValidateDeviceCredentialsResponse msg) {
return SessionInfoProto.newBuilder() return SessionInfoProto.newBuilder()
.setNodeId(nodeId) .setNodeId(nodeId)

View File

@ -55,4 +55,5 @@ public interface LwM2mClientContext {
void registerClient(Registration registration, ValidateDeviceCredentialsResponse credentials); void registerClient(Registration registration, ValidateDeviceCredentialsResponse credentials);
void update(LwM2mClient lwM2MClient);
} }

View File

@ -24,6 +24,7 @@ import org.eclipse.leshan.server.registration.Registration;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
@ -31,6 +32,7 @@ import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo; import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext; import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil; import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil;
import org.thingsboard.server.transport.lwm2m.server.session.LwM2MSessionManager;
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MClientStore; import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MClientStore;
import org.thingsboard.server.transport.lwm2m.server.store.TbMainSecurityStore; import org.thingsboard.server.transport.lwm2m.server.store.TbMainSecurityStore;
@ -58,6 +60,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
private final LwM2MTransportServerConfig config; private final LwM2MTransportServerConfig config;
private final TbMainSecurityStore securityStore; private final TbMainSecurityStore securityStore;
private final TbLwM2MClientStore clientStore; private final TbLwM2MClientStore clientStore;
private final LwM2MSessionManager sessionManager;
private final Map<String, LwM2mClient> lwM2mClientsByEndpoint = new ConcurrentHashMap<>(); private final Map<String, LwM2mClient> lwM2mClientsByEndpoint = new ConcurrentHashMap<>();
private final Map<String, LwM2mClient> lwM2mClientsByRegistrationId = new ConcurrentHashMap<>(); private final Map<String, LwM2mClient> lwM2mClientsByRegistrationId = new ConcurrentHashMap<>();
private final Map<UUID, Lwm2mDeviceProfileTransportConfiguration> profiles = new ConcurrentHashMap<>(); private final Map<UUID, Lwm2mDeviceProfileTransportConfiguration> profiles = new ConcurrentHashMap<>();
@ -66,14 +69,23 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
public LwM2mClient getClientByEndpoint(String endpoint) { public LwM2mClient getClientByEndpoint(String endpoint) {
return lwM2mClientsByEndpoint.computeIfAbsent(endpoint, ep -> { return lwM2mClientsByEndpoint.computeIfAbsent(endpoint, ep -> {
LwM2mClient client = clientStore.get(ep); LwM2mClient client = clientStore.get(ep);
String nodeId = context.getNodeId();
if (client == null) { if (client == null) {
log.info("[{}] initialized new client.", endpoint); log.info("[{}] initialized new client.", endpoint);
client = new LwM2mClient(context.getNodeId(), ep); client = new LwM2mClient(nodeId, ep);
} else { } else {
log.debug("[{}] fetched client from store: {}", endpoint, client); log.debug("[{}] fetched client from store: {}", endpoint, client);
boolean updated = false;
if (client.getRegistration() != null) { if (client.getRegistration() != null) {
lwM2mClientsByRegistrationId.put(client.getRegistration().getId(), client); lwM2mClientsByRegistrationId.put(client.getRegistration().getId(), client);
//TODO: create ThingsBoard session. }
if (client.getSession() != null) {
client.refreshSessionId(nodeId);
sessionManager.register(client.getSession());
updated = true;
}
if (updated) {
clientStore.put(client);
} }
} }
return client; return client;
@ -81,15 +93,15 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
} }
@Override @Override
public Optional<TransportProtos.SessionInfoProto> register(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException { public Optional<TransportProtos.SessionInfoProto> register(LwM2mClient client, Registration registration) throws LwM2MClientStateException {
TransportProtos.SessionInfoProto oldSession = null; TransportProtos.SessionInfoProto oldSession = null;
lwM2MClient.lock(); client.lock();
try { try {
if (LwM2MClientState.UNREGISTERED.equals(lwM2MClient.getState())) { if (LwM2MClientState.UNREGISTERED.equals(client.getState())) {
throw new LwM2MClientStateException(lwM2MClient.getState(), "Client is in invalid state."); throw new LwM2MClientStateException(client.getState(), "Client is in invalid state.");
} }
oldSession = lwM2MClient.getSession(); oldSession = client.getSession();
TbLwM2MSecurityInfo securityInfo = securityStore.getTbLwM2MSecurityInfoByEndpoint(lwM2MClient.getEndpoint()); TbLwM2MSecurityInfo securityInfo = securityStore.getTbLwM2MSecurityInfoByEndpoint(client.getEndpoint());
if (securityInfo.getSecurityMode() != null) { if (securityInfo.getSecurityMode() != null) {
if (SecurityMode.X509.equals(securityInfo.getSecurityMode())) { if (SecurityMode.X509.equals(securityInfo.getSecurityMode())) {
securityStore.registerX509(registration.getEndpoint(), registration.getId()); securityStore.registerX509(registration.getEndpoint(), registration.getId());
@ -97,57 +109,57 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
if (securityInfo.getDeviceProfile() != null) { if (securityInfo.getDeviceProfile() != null) {
profileUpdate(securityInfo.getDeviceProfile()); profileUpdate(securityInfo.getDeviceProfile());
if (securityInfo.getSecurityInfo() != null) { if (securityInfo.getSecurityInfo() != null) {
lwM2MClient.init(securityInfo.getMsg(), UUID.randomUUID()); client.init(securityInfo.getMsg(), UUID.randomUUID());
} else if (NO_SEC.equals(securityInfo.getSecurityMode())) { } else if (NO_SEC.equals(securityInfo.getSecurityMode())) {
lwM2MClient.init(securityInfo.getMsg(), UUID.randomUUID()); client.init(securityInfo.getMsg(), UUID.randomUUID());
} else { } else {
throw new RuntimeException(String.format("Registration failed: device %s not found.", lwM2MClient.getEndpoint())); throw new RuntimeException(String.format("Registration failed: device %s not found.", client.getEndpoint()));
} }
} else { } else {
throw new RuntimeException(String.format("Registration failed: device %s not found.", lwM2MClient.getEndpoint())); throw new RuntimeException(String.format("Registration failed: device %s not found.", client.getEndpoint()));
} }
} else { } else {
throw new RuntimeException(String.format("Registration failed: FORBIDDEN, endpointId: %s", lwM2MClient.getEndpoint())); throw new RuntimeException(String.format("Registration failed: FORBIDDEN, endpointId: %s", client.getEndpoint()));
} }
lwM2MClient.setRegistration(registration); client.setRegistration(registration);
this.lwM2mClientsByRegistrationId.put(registration.getId(), lwM2MClient); this.lwM2mClientsByRegistrationId.put(registration.getId(), client);
lwM2MClient.setState(LwM2MClientState.REGISTERED); client.setState(LwM2MClientState.REGISTERED);
clientStore.put(lwM2MClient); clientStore.put(client);
} finally { } finally {
lwM2MClient.unlock(); client.unlock();
} }
return Optional.ofNullable(oldSession); return Optional.ofNullable(oldSession);
} }
@Override @Override
public void updateRegistration(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException { public void updateRegistration(LwM2mClient client, Registration registration) throws LwM2MClientStateException {
lwM2MClient.lock(); client.lock();
try { try {
if (!LwM2MClientState.REGISTERED.equals(lwM2MClient.getState())) { if (!LwM2MClientState.REGISTERED.equals(client.getState())) {
throw new LwM2MClientStateException(lwM2MClient.getState(), "Client is in invalid state."); throw new LwM2MClientStateException(client.getState(), "Client is in invalid state.");
} }
lwM2MClient.setRegistration(registration); client.setRegistration(registration);
clientStore.put(lwM2MClient); clientStore.put(client);
} finally { } finally {
lwM2MClient.unlock(); client.unlock();
} }
} }
@Override @Override
public void unregister(LwM2mClient lwM2MClient, Registration registration) throws LwM2MClientStateException { public void unregister(LwM2mClient client, Registration registration) throws LwM2MClientStateException {
lwM2MClient.lock(); client.lock();
try { try {
if (!LwM2MClientState.REGISTERED.equals(lwM2MClient.getState())) { if (!LwM2MClientState.REGISTERED.equals(client.getState())) {
throw new LwM2MClientStateException(lwM2MClient.getState(), "Client is in invalid state."); throw new LwM2MClientStateException(client.getState(), "Client is in invalid state.");
} }
lwM2mClientsByRegistrationId.remove(registration.getId()); lwM2mClientsByRegistrationId.remove(registration.getId());
Registration currentRegistration = lwM2MClient.getRegistration(); Registration currentRegistration = client.getRegistration();
if (currentRegistration.getId().equals(registration.getId())) { if (currentRegistration.getId().equals(registration.getId())) {
lwM2MClient.setState(LwM2MClientState.UNREGISTERED); client.setState(LwM2MClientState.UNREGISTERED);
lwM2mClientsByEndpoint.remove(lwM2MClient.getEndpoint()); lwM2mClientsByEndpoint.remove(client.getEndpoint());
this.securityStore.remove(lwM2MClient.getEndpoint(), registration.getId()); this.securityStore.remove(client.getEndpoint(), registration.getId());
clientStore.remove(lwM2MClient.getEndpoint()); clientStore.remove(client.getEndpoint());
UUID profileId = lwM2MClient.getProfileId(); UUID profileId = client.getProfileId();
if (profileId != null) { if (profileId != null) {
Optional<LwM2mClient> otherClients = lwM2mClientsByRegistrationId.values().stream().filter(e -> e.getProfileId().equals(profileId)).findFirst(); Optional<LwM2mClient> otherClients = lwM2mClientsByRegistrationId.values().stream().filter(e -> e.getProfileId().equals(profileId)).findFirst();
if (otherClients.isEmpty()) { if (otherClients.isEmpty()) {
@ -155,19 +167,19 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
} }
} }
} else { } else {
throw new LwM2MClientStateException(lwM2MClient.getState(), "Client has different registration."); throw new LwM2MClientStateException(client.getState(), "Client has different registration.");
} }
} finally { } finally {
lwM2MClient.unlock(); client.unlock();
} }
} }
@Override @Override
public LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo) { public LwM2mClient getClientBySessionInfo(TransportProtos.SessionInfoProto sessionInfo) {
LwM2mClient lwM2mClient = null; LwM2mClient lwM2mClient = null;
UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
Predicate<LwM2mClient> isClientFilter = c -> Predicate<LwM2mClient> isClientFilter = c ->
(new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB())) sessionId.equals((new UUID(c.getSession().getSessionIdMSB(), c.getSession().getSessionIdLSB())));
.equals((new UUID(c.getSession().getSessionIdMSB(), c.getSession().getSessionIdLSB())));
if (this.lwM2mClientsByEndpoint.size() > 0) { if (this.lwM2mClientsByEndpoint.size() > 0) {
lwM2mClient = this.lwM2mClientsByEndpoint.values().stream().filter(isClientFilter).findAny().orElse(null); lwM2mClient = this.lwM2mClientsByEndpoint.values().stream().filter(isClientFilter).findAny().orElse(null);
} }
@ -175,19 +187,17 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
lwM2mClient = this.lwM2mClientsByRegistrationId.values().stream().filter(isClientFilter).findAny().orElse(null); lwM2mClient = this.lwM2mClientsByRegistrationId.values().stream().filter(isClientFilter).findAny().orElse(null);
} }
if (lwM2mClient == null) { if (lwM2mClient == null) {
log.warn("Device TimeOut? lwM2mClient is null."); log.error("[{}] Failed to lookup client by session id.", sessionId);
log.warn("SessionInfo input [{}], lwM2mClientsByEndpoint size: [{}] lwM2mClientsByRegistrationId: [{}]", sessionInfo, lwM2mClientsByEndpoint.values(), lwM2mClientsByRegistrationId.values());
log.error("", new RuntimeException());
} }
return lwM2mClient; return lwM2mClient;
} }
@Override @Override
public String getObjectIdByKeyNameFromProfile(LwM2mClient lwM2mClient, String keyName) { public String getObjectIdByKeyNameFromProfile(LwM2mClient client, String keyName) {
Lwm2mDeviceProfileTransportConfiguration profile = getProfile(lwM2mClient.getProfileId()); Lwm2mDeviceProfileTransportConfiguration profile = getProfile(client.getProfileId());
return profile.getObserveAttr().getKeyName().entrySet().stream() return profile.getObserveAttr().getKeyName().entrySet().stream()
.filter(e -> e.getValue().equals(keyName) && validateResourceInModel(lwM2mClient, e.getKey(), false)).findFirst().orElseThrow( .filter(e -> e.getValue().equals(keyName) && validateResourceInModel(client, e.getKey(), false)).findFirst().orElseThrow(
() -> new IllegalArgumentException(keyName + " is not configured in the device profile!") () -> new IllegalArgumentException(keyName + " is not configured in the device profile!")
).getKey(); ).getKey();
} }
@ -204,6 +214,16 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
profileUpdate(credentials.getDeviceProfile()); profileUpdate(credentials.getDeviceProfile());
} }
@Override
public void update(LwM2mClient client) {
client.lock();
try {
clientStore.put(client);
} finally {
client.unlock();
}
}
@Override @Override
public Collection<LwM2mClient> getLwM2mClients() { public Collection<LwM2mClient> getLwM2mClients() {
return lwM2mClientsByEndpoint.values(); return lwM2mClientsByEndpoint.values();
@ -221,9 +241,9 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
@Override @Override
public Lwm2mDeviceProfileTransportConfiguration profileUpdate(DeviceProfile deviceProfile) { public Lwm2mDeviceProfileTransportConfiguration profileUpdate(DeviceProfile deviceProfile) {
Lwm2mDeviceProfileTransportConfiguration lwM2MClientProfile = LwM2mTransportUtil.toLwM2MClientProfile(deviceProfile); Lwm2mDeviceProfileTransportConfiguration clientProfile = LwM2mTransportUtil.toLwM2MClientProfile(deviceProfile);
profiles.put(deviceProfile.getUuidId(), lwM2MClientProfile); profiles.put(deviceProfile.getUuidId(), clientProfile);
return lwM2MClientProfile; return clientProfile;
} }
@Override @Override

View File

@ -52,6 +52,7 @@ import org.thingsboard.server.transport.lwm2m.server.ota.firmware.FirmwareUpdate
import org.thingsboard.server.transport.lwm2m.server.ota.software.LwM2MSoftwareUpdateStrategy; import org.thingsboard.server.transport.lwm2m.server.ota.software.LwM2MSoftwareUpdateStrategy;
import org.thingsboard.server.transport.lwm2m.server.ota.software.SoftwareUpdateResult; import org.thingsboard.server.transport.lwm2m.server.ota.software.SoftwareUpdateResult;
import org.thingsboard.server.transport.lwm2m.server.ota.software.SoftwareUpdateState; import org.thingsboard.server.transport.lwm2m.server.ota.software.SoftwareUpdateState;
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MClientOtaInfoStore;
import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -123,6 +124,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
private final OtaPackageDataCache otaPackageDataCache; private final OtaPackageDataCache otaPackageDataCache;
private final LwM2MTelemetryLogService logService; private final LwM2MTelemetryLogService logService;
private final LwM2mTransportServerHelper helper; private final LwM2mTransportServerHelper helper;
private final TbLwM2MClientOtaInfoStore otaInfoStore;
@Autowired @Autowired
@Lazy @Lazy
@ -174,6 +176,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
}, throwable -> { }, throwable -> {
if (fwInfo.isSupported()) { if (fwInfo.isSupported()) {
fwInfo.setTargetFetchFailure(true); fwInfo.setTargetFetchFailure(true);
update(fwInfo);
} }
}, executor); }, executor);
} }
@ -191,6 +194,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
public void onTargetFirmwareUpdate(LwM2mClient client, String newFirmwareTitle, String newFirmwareVersion, Optional<String> newFirmwareUrl) { public void onTargetFirmwareUpdate(LwM2mClient client, String newFirmwareTitle, String newFirmwareVersion, Optional<String> newFirmwareUrl) {
LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client);
fwInfo.updateTarget(newFirmwareTitle, newFirmwareVersion, newFirmwareUrl); fwInfo.updateTarget(newFirmwareTitle, newFirmwareVersion, newFirmwareUrl);
update(fwInfo);
startFirmwareUpdateIfNeeded(client, fwInfo); startFirmwareUpdateIfNeeded(client, fwInfo);
} }
@ -202,7 +206,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
} }
@Override @Override
public void onCurrentFirmwareStrategyUpdate(LwM2mClient client, OtherConfiguration configuration) { public void onFirmwareStrategyUpdate(LwM2mClient client, OtherConfiguration configuration) {
log.debug("[{}] Current fw strategy: {}", client.getEndpoint(), configuration.getFwUpdateStrategy()); log.debug("[{}] Current fw strategy: {}", client.getEndpoint(), configuration.getFwUpdateStrategy());
LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client);
fwInfo.setFwStrategy(LwM2MFirmwareUpdateStrategy.fromStrategyFwByCode(configuration.getFwUpdateStrategy())); fwInfo.setFwStrategy(LwM2MFirmwareUpdateStrategy.fromStrategyFwByCode(configuration.getFwUpdateStrategy()));
@ -242,9 +246,10 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
executeFwUpdate(client); executeFwUpdate(client);
} }
fwInfo.setUpdateState(state); fwInfo.setUpdateState(state);
Optional<OtaPackageUpdateStatus> status = this.toOtaPackageUpdateStatus(state); Optional<OtaPackageUpdateStatus> status = toOtaPackageUpdateStatus(state);
status.ifPresent(otaStatus -> sendStateUpdateToTelemetry(client, fwInfo, status.ifPresent(otaStatus -> sendStateUpdateToTelemetry(client, fwInfo,
otaStatus, "Firmware Update State: " + state.name())); otaStatus, "Firmware Update State: " + state.name()));
update(fwInfo);
} }
@Override @Override
@ -252,15 +257,16 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
log.debug("[{}] Current fw result: {}", client.getEndpoint(), code); log.debug("[{}] Current fw result: {}", client.getEndpoint(), code);
LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client); LwM2MClientOtaInfo fwInfo = getOrInitFwInfo(client);
FirmwareUpdateResult result = FirmwareUpdateResult.fromUpdateResultFwByCode(code.intValue()); FirmwareUpdateResult result = FirmwareUpdateResult.fromUpdateResultFwByCode(code.intValue());
Optional<OtaPackageUpdateStatus> status = this.toOtaPackageUpdateStatus(result); Optional<OtaPackageUpdateStatus> status = toOtaPackageUpdateStatus(result);
status.ifPresent(otaStatus -> sendStateUpdateToTelemetry(client, fwInfo, status.ifPresent(otaStatus -> sendStateUpdateToTelemetry(client, fwInfo,
otaStatus, "Firmware Update Result: " + result.name())); otaStatus, "Firmware Update Result: " + result.name()));
if (result.isAgain() && fwInfo.getRetryAttempts() <= 2) { if (result.isAgain() && fwInfo.getRetryAttempts() <= 2) {
fwInfo.setRetryAttempts(fwInfo.getRetryAttempts() + 1); fwInfo.setRetryAttempts(fwInfo.getRetryAttempts() + 1);
startFirmwareUpdateIfNeeded(client, fwInfo); startFirmwareUpdateIfNeeded(client, fwInfo);
} else { } else {
fwInfo.setUpdateResult(result); fwInfo.update(result);
} }
update(fwInfo);
} }
@Override @Override
@ -378,23 +384,38 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
} }
public LwM2MClientOtaInfo getOrInitFwInfo(LwM2mClient client) { public LwM2MClientOtaInfo getOrInitFwInfo(LwM2mClient client) {
//TODO: fetch state from the cache or DB.
return this.fwStates.computeIfAbsent(client.getEndpoint(), endpoint -> { return this.fwStates.computeIfAbsent(client.getEndpoint(), endpoint -> {
LwM2MClientOtaInfo info = otaInfoStore.get(OtaPackageType.FIRMWARE, endpoint);
if (info == null) {
var profile = clientContext.getProfile(client.getProfileId()); var profile = clientContext.getProfile(client.getProfileId());
return new LwM2MClientOtaInfo(endpoint, OtaPackageType.FIRMWARE, profile.getClientLwM2mSettings().getFwUpdateStrategy(), info = new LwM2MClientOtaInfo(endpoint, OtaPackageType.FIRMWARE,
LwM2MFirmwareUpdateStrategy.fromStrategyFwByCode(profile.getClientLwM2mSettings().getFwUpdateStrategy()),
profile.getClientLwM2mSettings().getFwUpdateResource()); profile.getClientLwM2mSettings().getFwUpdateResource());
update(info);
}
return info;
}); });
} }
private LwM2MClientOtaInfo getOrInitSwInfo(LwM2mClient client) { private LwM2MClientOtaInfo getOrInitSwInfo(LwM2mClient client) {
//TODO: fetch state from the cache or DB. return this.fwStates.computeIfAbsent(client.getEndpoint(), endpoint -> {
return swStates.computeIfAbsent(client.getEndpoint(), endpoint -> { LwM2MClientOtaInfo info = otaInfoStore.get(OtaPackageType.SOFTWARE, endpoint);
if (info == null) {
var profile = clientContext.getProfile(client.getProfileId()); var profile = clientContext.getProfile(client.getProfileId());
return new LwM2MClientOtaInfo(endpoint, OtaPackageType.SOFTWARE, profile.getClientLwM2mSettings().getSwUpdateStrategy(), profile.getClientLwM2mSettings().getSwUpdateResource()); info = new LwM2MClientOtaInfo(endpoint, OtaPackageType.SOFTWARE,
LwM2MSoftwareUpdateStrategy.fromStrategySwByCode(profile.getClientLwM2mSettings().getFwUpdateStrategy()),
profile.getClientLwM2mSettings().getSwUpdateResource());
update(info);
}
return info;
}); });
} }
private void update(LwM2MClientOtaInfo info) {
otaInfoStore.put(info);
}
private void sendStateUpdateToTelemetry(LwM2mClient client, LwM2MClientOtaInfo fwInfo, OtaPackageUpdateStatus status, String log) { private void sendStateUpdateToTelemetry(LwM2mClient client, LwM2MClientOtaInfo fwInfo, OtaPackageUpdateStatus status, String log) {
List<TransportProtos.KeyValueProto> result = new ArrayList<>(); List<TransportProtos.KeyValueProto> result = new ArrayList<>();
TransportProtos.KeyValueProto.Builder kvProto = TransportProtos.KeyValueProto.newBuilder().setKey(getAttributeKey(fwInfo.getType(), STATE)); TransportProtos.KeyValueProto.Builder kvProto = TransportProtos.KeyValueProto.newBuilder().setKey(getAttributeKey(fwInfo.getType(), STATE));

View File

@ -15,7 +15,9 @@
*/ */
package org.thingsboard.server.transport.lwm2m.server.ota; package org.thingsboard.server.transport.lwm2m.server.ota;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.transport.lwm2m.server.ota.firmware.LwM2MFirmwareUpdateStrategy; import org.thingsboard.server.transport.lwm2m.server.ota.firmware.LwM2MFirmwareUpdateStrategy;
@ -26,10 +28,11 @@ import org.thingsboard.server.transport.lwm2m.server.ota.software.LwM2MSoftwareU
import java.util.Optional; import java.util.Optional;
@Data @Data
@NoArgsConstructor
public class LwM2MClientOtaInfo { public class LwM2MClientOtaInfo {
private final String endpoint; private String endpoint;
private final OtaPackageType type; private OtaPackageType type;
private String baseUrl; private String baseUrl;
@ -53,10 +56,17 @@ public class LwM2MClientOtaInfo {
private String failedPackageId; private String failedPackageId;
private int retryAttempts; private int retryAttempts;
public LwM2MClientOtaInfo(String endpoint, OtaPackageType type, Integer strategyCode, String baseUrl) { public LwM2MClientOtaInfo(String endpoint, OtaPackageType type, LwM2MFirmwareUpdateStrategy fwStrategy, String baseUrl) {
this.endpoint = endpoint; this.endpoint = endpoint;
this.type = type; this.type = type;
this.fwStrategy = LwM2MFirmwareUpdateStrategy.fromStrategyFwByCode(strategyCode); this.fwStrategy = fwStrategy;
this.baseUrl = baseUrl;
}
public LwM2MClientOtaInfo(String endpoint, OtaPackageType type, LwM2MSoftwareUpdateStrategy swStrategy, String baseUrl) {
this.endpoint = endpoint;
this.type = type;
this.swStrategy = swStrategy;
this.baseUrl = baseUrl; this.baseUrl = baseUrl;
} }
@ -66,6 +76,7 @@ public class LwM2MClientOtaInfo {
this.targetUrl = newFirmwareUrl.orElse(null); this.targetUrl = newFirmwareUrl.orElse(null);
} }
@JsonIgnore
public boolean isUpdateRequired() { public boolean isUpdateRequired() {
if (StringUtils.isEmpty(targetName) || StringUtils.isEmpty(targetVersion) || !isSupported()) { if (StringUtils.isEmpty(targetName) || StringUtils.isEmpty(targetVersion) || !isSupported()) {
return false; return false;
@ -86,11 +97,12 @@ public class LwM2MClientOtaInfo {
} }
} }
@JsonIgnore
public boolean isSupported() { public boolean isSupported() {
return StringUtils.isNotEmpty(currentName) || StringUtils.isNotEmpty(currentVersion5) || StringUtils.isNotEmpty(currentVersion3); return StringUtils.isNotEmpty(currentName) || StringUtils.isNotEmpty(currentVersion5) || StringUtils.isNotEmpty(currentVersion3);
} }
public void setUpdateResult(FirmwareUpdateResult updateResult) { public void update(FirmwareUpdateResult updateResult) {
this.updateResult = updateResult; this.updateResult = updateResult;
switch (updateResult) { switch (updateResult) {
case INITIAL: case INITIAL:

View File

@ -32,7 +32,7 @@ public interface LwM2MOtaUpdateService {
void onCurrentFirmwareNameUpdate(LwM2mClient client, String name); void onCurrentFirmwareNameUpdate(LwM2mClient client, String name);
void onCurrentFirmwareStrategyUpdate(LwM2mClient client, OtherConfiguration configuration); void onFirmwareStrategyUpdate(LwM2mClient client, OtherConfiguration configuration);
void onCurrentSoftwareStrategyUpdate(LwM2mClient client, OtherConfiguration configuration); void onCurrentSoftwareStrategyUpdate(LwM2mClient client, OtherConfiguration configuration);

View File

@ -0,0 +1,67 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.session;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.service.DefaultTransportService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
import org.thingsboard.server.transport.lwm2m.server.LwM2mSessionMsgListener;
import org.thingsboard.server.transport.lwm2m.server.attributes.LwM2MAttributesService;
import org.thingsboard.server.transport.lwm2m.server.rpc.LwM2MRpcRequestHandler;
import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
@Slf4j
@Service
@TbLwM2mTransportComponent
public class DefaultLwM2MSessionManager implements LwM2MSessionManager {
private final TransportService transportService;
private final LwM2MAttributesService attributesService;
private final LwM2MRpcRequestHandler rpcHandler;
private final LwM2mUplinkMsgHandler uplinkHandler;
public DefaultLwM2MSessionManager(TransportService transportService,
@Lazy LwM2MAttributesService attributesService,
@Lazy LwM2MRpcRequestHandler rpcHandler,
@Lazy LwM2mUplinkMsgHandler uplinkHandler) {
this.transportService = transportService;
this.attributesService = attributesService;
this.rpcHandler = rpcHandler;
this.uplinkHandler = uplinkHandler;
}
@Override
public void register(TransportProtos.SessionInfoProto sessionInfo) {
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(uplinkHandler, attributesService, rpcHandler, sessionInfo, transportService));
TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(sessionInfo)
.setSessionEvent(DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN))
.setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build())
.setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build())
.build();
transportService.process(msg, null);
}
@Override
public void deregister(TransportProtos.SessionInfoProto sessionInfo) {
transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
transportService.deregisterSession(sessionInfo);
}
}

View File

@ -0,0 +1,27 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.session;
import org.thingsboard.server.gen.transport.TransportProtos;
public interface LwM2MSessionManager {
void register(TransportProtos.SessionInfoProto sessionInfo);
void deregister(TransportProtos.SessionInfoProto sessionInfo);
}

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.store;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MClientOtaInfo;
public class TbDummyLwM2MClientOtaInfoStore implements TbLwM2MClientOtaInfoStore {
@Override
public LwM2MClientOtaInfo get(OtaPackageType type, String endpoint) {
return null;
}
@Override
public void put(LwM2MClientOtaInfo info) {
}
}

View File

@ -1,3 +1,18 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.store; package org.thingsboard.server.transport.lwm2m.server.store;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;

View File

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.store;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MClientOtaInfo;
public interface TbLwM2MClientOtaInfoStore {
LwM2MClientOtaInfo get(OtaPackageType type, String endpoint);
void put(LwM2MClientOtaInfo info);
}

View File

@ -1,3 +1,18 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.store; package org.thingsboard.server.transport.lwm2m.server.store;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;

View File

@ -0,0 +1,54 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.store;
import org.eclipse.leshan.server.security.NonUniqueSecurityInfoException;
import org.eclipse.leshan.server.security.SecurityInfo;
import org.nustaq.serialization.FSTConfiguration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MClientOtaInfo;
import java.util.concurrent.locks.Lock;
public class TbLwM2mRedisClientOtaInfoStore implements TbLwM2MClientOtaInfoStore {
private static final String OTA_EP = "OTA#EP#";
private final RedisConnectionFactory connectionFactory;
public TbLwM2mRedisClientOtaInfoStore(RedisConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Override
public LwM2MClientOtaInfo get(OtaPackageType type, String endpoint) {
try (var connection = connectionFactory.getConnection()) {
byte[] data = connection.get((OTA_EP + type + endpoint).getBytes());
return JacksonUtil.fromBytes(data, LwM2MClientOtaInfo.class);
}
}
@Override
public void put(LwM2MClientOtaInfo info) {
try (var connection = connectionFactory.getConnection()) {
connection.set((OTA_EP + info.getType() + info.getEndpoint()).getBytes(), JacksonUtil.toString(info).getBytes());
}
}
}

View File

@ -20,6 +20,7 @@ import org.eclipse.leshan.server.californium.registration.InMemoryRegistrationSt
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.cache.TBRedisCacheConfiguration; import org.thingsboard.server.cache.TBRedisCacheConfiguration;
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
@ -46,14 +47,14 @@ public class TbLwM2mStoreFactory {
@Bean @Bean
private CaliforniumRegistrationStore registrationStore() { private CaliforniumRegistrationStore registrationStore() {
return redisConfiguration.isPresent() && useRedis ? return isRedis() ?
new TbLwM2mRedisRegistrationStore(redisConfiguration.get().redisConnectionFactory()) : new InMemoryRegistrationStore(config.getCleanPeriodInSec()); new TbLwM2mRedisRegistrationStore(getConnectionFactory()) : new InMemoryRegistrationStore(config.getCleanPeriodInSec());
} }
@Bean @Bean
private TbMainSecurityStore securityStore() { private TbMainSecurityStore securityStore() {
return new TbLwM2mSecurityStore(redisConfiguration.isPresent() && useRedis ? return new TbLwM2mSecurityStore(isRedis() ?
new TbLwM2mRedisSecurityStore(redisConfiguration.get().redisConnectionFactory()) : new TbInMemorySecurityStore(), validator); new TbLwM2mRedisSecurityStore(getConnectionFactory()) : new TbInMemorySecurityStore(), validator);
} }
@Bean @Bean
@ -61,10 +62,22 @@ public class TbLwM2mStoreFactory {
return new TbDummyLwM2MClientStore(); return new TbDummyLwM2MClientStore();
} }
@Bean
private TbLwM2MClientOtaInfoStore otaStore() {
return isRedis() ? new TbLwM2mRedisClientOtaInfoStore(getConnectionFactory()) : new TbDummyLwM2MClientOtaInfoStore();
}
@Bean @Bean
private TbLwM2MDtlsSessionStore sessionStore() { private TbLwM2MDtlsSessionStore sessionStore() {
return redisConfiguration.isPresent() && useRedis ? return isRedis() ? new TbLwM2MDtlsSessionRedisStore(getConnectionFactory()) : new TbL2M2MDtlsSessionInMemoryStore();
new TbLwM2MDtlsSessionRedisStore(redisConfiguration.get().redisConnectionFactory()) : new TbL2M2MDtlsSessionInMemoryStore(); }
private RedisConnectionFactory getConnectionFactory() {
return redisConfiguration.get().redisConnectionFactory();
}
private boolean isRedis() {
return redisConfiguration.isPresent() && useRedis;
} }
} }

View File

@ -48,15 +48,11 @@ import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTrans
import org.thingsboard.server.common.data.ota.OtaPackageUtil; import org.thingsboard.server.common.data.ota.OtaPackageUtil;
import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.service.DefaultTransportService;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
import org.thingsboard.server.transport.lwm2m.server.LwM2mOtaConvert; import org.thingsboard.server.transport.lwm2m.server.LwM2mOtaConvert;
import org.thingsboard.server.transport.lwm2m.server.LwM2mQueuedRequest;
import org.thingsboard.server.transport.lwm2m.server.LwM2mSessionMsgListener;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext; import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServerHelper; import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServerHelper;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil; import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil;
@ -84,6 +80,7 @@ import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttrib
import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService;
import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService; import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService;
import org.thingsboard.server.transport.lwm2m.server.rpc.LwM2MRpcRequestHandler; import org.thingsboard.server.transport.lwm2m.server.rpc.LwM2MRpcRequestHandler;
import org.thingsboard.server.transport.lwm2m.server.session.LwM2MSessionManager;
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MDtlsSessionStore; import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MDtlsSessionStore;
import org.thingsboard.server.transport.lwm2m.utils.LwM2mValueConverterImpl; import org.thingsboard.server.transport.lwm2m.utils.LwM2mValueConverterImpl;
@ -129,6 +126,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
private final TransportService transportService; private final TransportService transportService;
private final LwM2mTransportContext context; private final LwM2mTransportContext context;
private final LwM2MAttributesService attributesService; private final LwM2MAttributesService attributesService;
private final LwM2MSessionManager sessionManager;
private final LwM2MOtaUpdateService otaService; private final LwM2MOtaUpdateService otaService;
private final LwM2MTransportServerConfig config; private final LwM2MTransportServerConfig config;
private final LwM2MTelemetryLogService logService; private final LwM2MTelemetryLogService logService;
@ -143,12 +141,14 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
LwM2mTransportServerHelper helper, LwM2mTransportServerHelper helper,
LwM2mClientContext clientContext, LwM2mClientContext clientContext,
LwM2MTelemetryLogService logService, LwM2MTelemetryLogService logService,
LwM2MSessionManager sessionManager,
@Lazy LwM2MOtaUpdateService otaService, @Lazy LwM2MOtaUpdateService otaService,
@Lazy LwM2MAttributesService attributesService, @Lazy LwM2MAttributesService attributesService,
@Lazy LwM2MRpcRequestHandler rpcHandler, @Lazy LwM2MRpcRequestHandler rpcHandler,
@Lazy LwM2mDownlinkMsgHandler defaultLwM2MDownlinkMsgHandler, @Lazy LwM2mDownlinkMsgHandler defaultLwM2MDownlinkMsgHandler,
LwM2mTransportContext context, TbLwM2MDtlsSessionStore sessionStore) { LwM2mTransportContext context, TbLwM2MDtlsSessionStore sessionStore) {
this.transportService = transportService; this.transportService = transportService;
this.sessionManager = sessionManager;
this.attributesService = attributesService; this.attributesService = attributesService;
this.otaService = otaService; this.otaService = otaService;
this.config = config; this.config = config;
@ -205,18 +205,10 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
Optional<SessionInfoProto> oldSessionInfo = this.clientContext.register(lwM2MClient, registration); Optional<SessionInfoProto> oldSessionInfo = this.clientContext.register(lwM2MClient, registration);
if (oldSessionInfo.isPresent()) { if (oldSessionInfo.isPresent()) {
log.info("[{}] Closing old session: {}", registration.getEndpoint(), new UUID(oldSessionInfo.get().getSessionIdMSB(), oldSessionInfo.get().getSessionIdLSB())); log.info("[{}] Closing old session: {}", registration.getEndpoint(), new UUID(oldSessionInfo.get().getSessionIdMSB(), oldSessionInfo.get().getSessionIdLSB()));
closeSession(oldSessionInfo.get()); sessionManager.deregister(oldSessionInfo.get());
} }
logService.log(lwM2MClient, LOG_LWM2M_INFO + ": Client registered with registration id: " + registration.getId()); logService.log(lwM2MClient, LOG_LWM2M_INFO + ": Client registered with registration id: " + registration.getId());
SessionInfoProto sessionInfo = lwM2MClient.getSession(); sessionManager.register(lwM2MClient.getSession());
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo, transportService));
TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(sessionInfo)
.setSessionEvent(DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN))
.setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build())
.setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build())
.build();
transportService.process(msg, null);
this.initClientTelemetry(lwM2MClient); this.initClientTelemetry(lwM2MClient);
this.initAttributes(lwM2MClient); this.initAttributes(lwM2MClient);
otaService.init(lwM2MClient); otaService.init(lwM2MClient);
@ -273,7 +265,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
clientContext.unregister(client, registration); clientContext.unregister(client, registration);
SessionInfoProto sessionInfo = client.getSession(); SessionInfoProto sessionInfo = client.getSession();
if (sessionInfo != null) { if (sessionInfo != null) {
closeSession(sessionInfo); sessionManager.deregister(sessionInfo);
sessionStore.remove(registration.getEndpoint()); sessionStore.remove(registration.getEndpoint());
log.info("Client close session: [{}] unReg [{}] name [{}] profile ", registration.getId(), registration.getEndpoint(), sessionInfo.getDeviceType()); log.info("Client close session: [{}] unReg [{}] name [{}] profile ", registration.getId(), registration.getEndpoint(), sessionInfo.getDeviceType());
} else { } else {
@ -288,11 +280,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
}); });
} }
public void closeSession(SessionInfoProto sessionInfo) {
transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.CLOSED), null);
transportService.deregisterSession(sessionInfo);
}
@Override @Override
public void onSleepingDev(Registration registration) { public void onSleepingDev(Registration registration) {
log.info("[{}] [{}] Received endpoint Sleeping version event", registration.getId(), registration.getEndpoint()); log.info("[{}] [{}] Received endpoint Sleeping version event", registration.getId(), registration.getEndpoint());
@ -300,19 +287,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
//TODO: associate endpointId with device information. //TODO: associate endpointId with device information.
} }
// /**
// * Cancel observation for All objects for this registration
// */
// @Override
// public void setCancelObservationsAll(Registration registration) {
// if (registration != null) {
// LwM2mClient client = clientContext.getClientByEndpoint(registration.getEndpoint());
// if (client != null && client.getRegistration() != null && client.getRegistration().getId().equals(registration.getId())) {
// defaultLwM2MDownlinkMsgHandler.sendCancelAllRequest(client, TbLwM2MCancelAllRequest.builder().build(), new TbLwM2MCancelAllObserveRequestCallback(this, client));
// }
// }
// }
/** /**
* Sending observe value to thingsboard from ObservationListener.onResponse: object, instance, SingleResource or MultipleResource * Sending observe value to thingsboard from ObservationListener.onResponse: object, instance, SingleResource or MultipleResource
* *
@ -337,6 +311,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
this.updateResourcesValue(lwM2MClient, lwM2mResource, path); this.updateResourcesValue(lwM2MClient, lwM2mResource, path);
} }
} }
clientContext.update(lwM2MClient);
} }
} }
@ -375,16 +350,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
clientContext.getLwM2mClients().forEach(e -> e.deleteResources(pathIdVer, this.config.getModelProvider())); clientContext.getLwM2mClients().forEach(e -> e.deleteResources(pathIdVer, this.config.getModelProvider()));
} }
/**
* Deregister session in transport
*
* @param sessionInfo - lwm2m client
*/
@Override
public void doDisconnect(SessionInfoProto sessionInfo) {
closeSession(sessionInfo);
}
/** /**
* Those methods are called by the protocol stage thread pool, this means that execution MUST be done in a short delay, * Those methods are called by the protocol stage thread pool, this means that execution MUST be done in a short delay,
* * if you need to do long time processing use a dedicated thread pool. * * if you need to do long time processing use a dedicated thread pool.
@ -479,14 +444,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
attributesMap.forEach((targetId, params) -> sendWriteAttributesRequest(lwM2MClient, targetId, params)); attributesMap.forEach((targetId, params) -> sendWriteAttributesRequest(lwM2MClient, targetId, params));
} }
private void sendDiscoverRequests(LwM2mClient lwM2MClient, Lwm2mDeviceProfileTransportConfiguration profile, Set<String> supportedObjects) {
Set<String> targetIds = profile.getObserveAttr().getAttributeLwm2m().keySet();
targetIds = targetIds.stream().filter(target -> isSupportedTargetId(supportedObjects, target)).collect(Collectors.toSet());
// TODO: why do we need to put observe into pending read requests?
// lwM2MClient.getPendingReadRequests().addAll(targetIds);
targetIds.forEach(targetId -> sendDiscoverRequest(lwM2MClient, targetId));
}
private void sendDiscoverRequest(LwM2mClient lwM2MClient, String targetId) { private void sendDiscoverRequest(LwM2mClient lwM2MClient, String targetId) {
TbLwM2MDiscoverRequest request = TbLwM2MDiscoverRequest.builder().versionedId(targetId).timeout(this.config.getTimeout()).build(); TbLwM2MDiscoverRequest request = TbLwM2MDiscoverRequest.builder().versionedId(targetId).timeout(this.config.getTimeout()).build();
defaultLwM2MDownlinkMsgHandler.sendDiscoverRequest(lwM2MClient, request, new TbLwM2MDiscoverCallback(logService, lwM2MClient, targetId)); defaultLwM2MDownlinkMsgHandler.sendDiscoverRequest(lwM2MClient, request, new TbLwM2MDiscoverCallback(logService, lwM2MClient, targetId));
@ -637,7 +594,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
List<TransportProtos.KeyValueProto> resultAttributes = new ArrayList<>(); List<TransportProtos.KeyValueProto> resultAttributes = new ArrayList<>();
profile.getObserveAttr().getAttribute().forEach(pathIdVer -> { profile.getObserveAttr().getAttribute().forEach(pathIdVer -> {
if (path.contains(pathIdVer)) { if (path.contains(pathIdVer)) {
TransportProtos.KeyValueProto kvAttr = this.getKvToThingsboard(pathIdVer, registration); TransportProtos.KeyValueProto kvAttr = this.getKvToThingsBoard(pathIdVer, registration);
if (kvAttr != null) { if (kvAttr != null) {
resultAttributes.add(kvAttr); resultAttributes.add(kvAttr);
} }
@ -646,7 +603,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
List<TransportProtos.KeyValueProto> resultTelemetries = new ArrayList<>(); List<TransportProtos.KeyValueProto> resultTelemetries = new ArrayList<>();
profile.getObserveAttr().getTelemetry().forEach(pathIdVer -> { profile.getObserveAttr().getTelemetry().forEach(pathIdVer -> {
if (path.contains(pathIdVer)) { if (path.contains(pathIdVer)) {
TransportProtos.KeyValueProto kvAttr = this.getKvToThingsboard(pathIdVer, registration); TransportProtos.KeyValueProto kvAttr = this.getKvToThingsBoard(pathIdVer, registration);
if (kvAttr != null) { if (kvAttr != null) {
resultTelemetries.add(kvAttr); resultTelemetries.add(kvAttr);
} }
@ -663,7 +620,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
return null; return null;
} }
private TransportProtos.KeyValueProto getKvToThingsboard(String pathIdVer, Registration registration) { private TransportProtos.KeyValueProto getKvToThingsBoard(String pathIdVer, Registration registration) {
LwM2mClient lwM2MClient = this.clientContext.getClientByEndpoint(registration.getEndpoint()); LwM2mClient lwM2MClient = this.clientContext.getClientByEndpoint(registration.getEndpoint());
Map<String, String> names = clientContext.getProfile(lwM2MClient.getProfileId()).getObserveAttr().getKeyName(); Map<String, String> names = clientContext.getProfile(lwM2MClient.getProfileId()).getObserveAttr().getKeyName();
if (names != null && names.containsKey(pathIdVer)) { if (names != null && names.containsKey(pathIdVer)) {
@ -710,10 +667,12 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
public void onWriteResponseOk(LwM2mClient client, String path, WriteRequest request) { public void onWriteResponseOk(LwM2mClient client, String path, WriteRequest request) {
if (request.getNode() instanceof LwM2mResource) { if (request.getNode() instanceof LwM2mResource) {
this.updateResourcesValue(client, ((LwM2mResource) request.getNode()), path); this.updateResourcesValue(client, ((LwM2mResource) request.getNode()), path);
clientContext.update(client);
} else if (request.getNode() instanceof LwM2mObjectInstance) { } else if (request.getNode() instanceof LwM2mObjectInstance) {
((LwM2mObjectInstance) request.getNode()).getResources().forEach((resId, resource) -> { ((LwM2mObjectInstance) request.getNode()).getResources().forEach((resId, resource) -> {
this.updateResourcesValue(client, resource, path + "/" + resId); this.updateResourcesValue(client, resource, path + "/" + resId);
}); });
clientContext.update(client);
} }
} }
@ -788,7 +747,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
if (!newLwM2mSettings.getFwUpdateStrategy().equals(oldLwM2mSettings.getFwUpdateStrategy()) if (!newLwM2mSettings.getFwUpdateStrategy().equals(oldLwM2mSettings.getFwUpdateStrategy())
|| (StringUtils.isNotEmpty(newLwM2mSettings.getFwUpdateResource()) && || (StringUtils.isNotEmpty(newLwM2mSettings.getFwUpdateResource()) &&
!newLwM2mSettings.getFwUpdateResource().equals(oldLwM2mSettings.getFwUpdateResource()))) { !newLwM2mSettings.getFwUpdateResource().equals(oldLwM2mSettings.getFwUpdateResource()))) {
clients.forEach(lwM2MClient -> otaService.onCurrentFirmwareStrategyUpdate(lwM2MClient, newLwM2mSettings)); clients.forEach(lwM2MClient -> otaService.onFirmwareStrategyUpdate(lwM2MClient, newLwM2mSettings));
} }
if (!newLwM2mSettings.getSwUpdateStrategy().equals(oldLwM2mSettings.getSwUpdateStrategy()) if (!newLwM2mSettings.getSwUpdateStrategy().equals(oldLwM2mSettings.getSwUpdateStrategy())
@ -893,7 +852,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
*/ */
private void reportActivityAndRegister(SessionInfoProto sessionInfo) { private void reportActivityAndRegister(SessionInfoProto sessionInfo) {
if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) { if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) {
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo, transportService)); sessionManager.register(sessionInfo);
this.reportActivitySubscription(sessionInfo); this.reportActivitySubscription(sessionInfo);
} }
} }

View File

@ -48,8 +48,6 @@ public interface LwM2mUplinkMsgHandler {
void onResourceDelete(Optional<TransportProtos.ResourceDeleteMsg> resourceDeleteMsgOpt); void onResourceDelete(Optional<TransportProtos.ResourceDeleteMsg> resourceDeleteMsgOpt);
void doDisconnect(TransportProtos.SessionInfoProto sessionInfo);
void onAwakeDev(Registration registration); void onAwakeDev(Registration registration);
void onWriteResponseOk(LwM2mClient client, String path, WriteRequest request); void onWriteResponseOk(LwM2mClient client, String path, WriteRequest request);

View File

@ -67,6 +67,15 @@ public class JacksonUtil {
} }
} }
public static <T> T fromBytes(byte[] bytes, Class<T> clazz) {
try {
return bytes != null ? OBJECT_MAPPER.readValue(bytes, clazz) : null;
} catch (IOException e) {
throw new IllegalArgumentException("The given string value: "
+ Arrays.toString(bytes) + " cannot be transformed to Json object", e);
}
}
public static JsonNode fromBytes(byte[] bytes) { public static JsonNode fromBytes(byte[] bytes) {
try { try {
return OBJECT_MAPPER.readTree(bytes); return OBJECT_MAPPER.readTree(bytes);
@ -96,7 +105,7 @@ public class JacksonUtil {
} }
} }
public static ObjectNode newObjectNode(){ public static ObjectNode newObjectNode() {
return OBJECT_MAPPER.createObjectNode(); return OBJECT_MAPPER.createObjectNode();
} }