From 3a7a97dd9171296d246c524d44e55960a3540c4c Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 20 Jul 2021 15:50:35 +0300 Subject: [PATCH] PSM and eDRX implementation draft --- .../src/main/resources/thingsboard.yml | 105 ++++---- .../server/coapserver/CoapServerContext.java | 8 + .../device/data/lwm2m/OtherConfiguration.java | 5 + common/queue/src/main/proto/queue.proto | 2 + .../transport/coap/CoapTransportResource.java | 2 +- .../transport/coap/TbCoapMessageObserver.java | 5 +- .../coap/client/CoapClientContext.java | 2 + .../coap/client/DefaultCoapClientContext.java | 254 +++++++++++++++++- .../coap/client/TbCoapClientState.java | 82 +++++- .../common/transport/TransportService.java | 2 + .../transport/auth/TransportDeviceInfo.java | 2 + .../service/DefaultTransportService.java | 28 ++ .../src/main/resources/tb-coap-transport.yml | 5 + .../src/main/resources/tb-http-transport.yml | 3 + .../src/main/resources/tb-lwm2m-transport.yml | 3 + .../src/main/resources/tb-mqtt-transport.yml | 4 + .../src/main/resources/tb-snmp-transport.yml | 3 + 17 files changed, 452 insertions(+), 63 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 4fd605e2f2..0cede00913 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -228,57 +228,57 @@ cassandra: # SQL configuration parameters sql: - # Specify batch size for persisting attribute updates - attributes: - batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}" - batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}" - stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}" - batch_threads: "${SQL_ATTRIBUTES_BATCH_THREADS:4}" - ts: - batch_size: "${SQL_TS_BATCH_SIZE:10000}" - batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}" - stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" - batch_threads: "${SQL_TS_BATCH_THREADS:4}" - ts_latest: - batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}" - batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}" - stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}" - batch_threads: "${SQL_TS_LATEST_BATCH_THREADS:4}" - update_by_latest_ts: "${SQL_TS_UPDATE_BY_LATEST_TIMESTAMP:true}" - # Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks - batch_sort: "${SQL_BATCH_SORT:false}" - # Specify whether to remove null characters from strValue of attributes and timeseries before insert - remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}" + # Specify batch size for persisting attribute updates + attributes: + batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}" + batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}" + stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}" + batch_threads: "${SQL_ATTRIBUTES_BATCH_THREADS:4}" + ts: + batch_size: "${SQL_TS_BATCH_SIZE:10000}" + batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}" + stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" + batch_threads: "${SQL_TS_BATCH_THREADS:4}" + ts_latest: + batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}" + batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}" + stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}" + batch_threads: "${SQL_TS_LATEST_BATCH_THREADS:4}" + update_by_latest_ts: "${SQL_TS_UPDATE_BY_LATEST_TIMESTAMP:true}" + # Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks + batch_sort: "${SQL_BATCH_SORT:false}" + # Specify whether to remove null characters from strValue of attributes and timeseries before insert + remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}" # Specify whether to log database queries and their parameters generated by entity query repository - log_queries: "${SQL_LOG_QUERIES:false}" - log_queries_threshold: "${SQL_LOG_QUERIES_THRESHOLD:5000}" - postgres: - # Specify partitioning size for timestamp key-value storage. Example: DAYS, MONTHS, YEARS, INDEFINITE. - ts_key_value_partitioning: "${SQL_POSTGRES_TS_KV_PARTITIONING:MONTHS}" - timescale: - # Specify Interval size for new data chunks storage. - chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}" - batch_threads: "${SQL_TIMESCALE_BATCH_THREADS:4}" - ttl: - ts: - enabled: "${SQL_TTL_TS_ENABLED:true}" - execution_interval_ms: "${SQL_TTL_TS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day - ts_key_value_ttl: "${SQL_TTL_TS_TS_KEY_VALUE_TTL:0}" # Number of seconds - events: - enabled: "${SQL_TTL_EVENTS_ENABLED:true}" - execution_interval_ms: "${SQL_TTL_EVENTS_EXECUTION_INTERVAL:2220000}" # Number of milliseconds (max random initial delay and fixed period). # 37minutes to avoid common interval spikes - events_ttl: "${SQL_TTL_EVENTS_EVENTS_TTL:0}" # Number of seconds - debug_events_ttl: "${SQL_TTL_EVENTS_DEBUG_EVENTS_TTL:604800}" # Number of seconds. The current value corresponds to one week - edge_events: - enabled: "${SQL_TTL_EDGE_EVENTS_ENABLED:true}" - execution_interval_ms: "${SQL_TTL_EDGE_EVENTS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day - edge_events_ttl: "${SQL_TTL_EDGE_EVENTS_TTL:2628000}" # Number of seconds. The current value corresponds to one month - alarms: - checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours - removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches - rpc: - enabled: "${SQL_TTL_RPC_ENABLED:true}" - checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours + log_queries: "${SQL_LOG_QUERIES:false}" + log_queries_threshold: "${SQL_LOG_QUERIES_THRESHOLD:5000}" + postgres: + # Specify partitioning size for timestamp key-value storage. Example: DAYS, MONTHS, YEARS, INDEFINITE. + ts_key_value_partitioning: "${SQL_POSTGRES_TS_KV_PARTITIONING:MONTHS}" + timescale: + # Specify Interval size for new data chunks storage. + chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}" + batch_threads: "${SQL_TIMESCALE_BATCH_THREADS:4}" + ttl: + ts: + enabled: "${SQL_TTL_TS_ENABLED:true}" + execution_interval_ms: "${SQL_TTL_TS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day + ts_key_value_ttl: "${SQL_TTL_TS_TS_KEY_VALUE_TTL:0}" # Number of seconds + events: + enabled: "${SQL_TTL_EVENTS_ENABLED:true}" + execution_interval_ms: "${SQL_TTL_EVENTS_EXECUTION_INTERVAL:2220000}" # Number of milliseconds (max random initial delay and fixed period). # 37minutes to avoid common interval spikes + events_ttl: "${SQL_TTL_EVENTS_EVENTS_TTL:0}" # Number of seconds + debug_events_ttl: "${SQL_TTL_EVENTS_DEBUG_EVENTS_TTL:604800}" # Number of seconds. The current value corresponds to one week + edge_events: + enabled: "${SQL_TTL_EDGE_EVENTS_ENABLED:true}" + execution_interval_ms: "${SQL_TTL_EDGE_EVENTS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day + edge_events_ttl: "${SQL_TTL_EDGE_EVENTS_TTL:2628000}" # Number of seconds. The current value corresponds to one month + alarms: + checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours + removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches + rpc: + enabled: "${SQL_TTL_RPC_ENABLED:true}" + checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours # Actor system parameters actors: @@ -573,6 +573,9 @@ transport: timeout: "${CLIENT_SIDE_RPC_TIMEOUT:60000}" # Enable/disable http/mqtt/coap transport protocols (has higher priority than certain protocol's 'enabled' property) api_enabled: "${TB_TRANSPORT_API_ENABLED:true}" + log: + enabled: "${TB_TRANSPORT_LOG_ENABLED:true}" + max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}" # Local HTTP transport parameters http: enabled: "${HTTP_ENABLED:true}" @@ -618,6 +621,8 @@ transport: bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}" bind_port: "${COAP_BIND_PORT:5683}" timeout: "${COAP_TIMEOUT:10000}" + psm_activity_timer: "${COAP_PSM_ACTIVITY_TIMER:10000}" + paging_transmission_window: "${COAP_PAGING_TRANSMISSION_WINDOW:10000}" dtls: # Enable/disable DTLS 1.2 support enabled: "${COAP_DTLS_ENABLED:false}" diff --git a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java index 0f5aae8bd8..f6c597d35f 100644 --- a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java +++ b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/CoapServerContext.java @@ -38,6 +38,14 @@ public class CoapServerContext { @Value("${transport.coap.timeout}") private Long timeout; + @Getter + @Value("${transport.coap.psm_activity_timer:10000}") + private long psmActivityTimer; + + @Getter + @Value("${transport.coap.paging_transmission_window:10000}") + private long pagingTransmissionWindow; + @Getter @Autowired(required = false) private TbCoapDtlsSettings dtlsSettings; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/device/data/lwm2m/OtherConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/device/data/lwm2m/OtherConfiguration.java index c75008f24d..4722246b4f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/device/data/lwm2m/OtherConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/device/data/lwm2m/OtherConfiguration.java @@ -17,6 +17,7 @@ package org.thingsboard.server.common.data.device.data.lwm2m; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; +import org.thingsboard.server.common.data.device.data.PowerMode; import org.thingsboard.server.common.data.device.data.PowerSavingConfiguration; @Data @@ -26,6 +27,10 @@ public class OtherConfiguration extends PowerSavingConfiguration { private Integer fwUpdateStrategy; private Integer swUpdateStrategy; private Integer clientOnlyObserveAfterConnect; + private PowerMode powerMode; + private Long psmActivityTimer; + private Long edrxCycle; + private Long pagingTransmissionWindow; private String fwUpdateResource; private String swUpdateResource; private boolean compositeOperationsSupport; diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index 9b09bf2328..811704fec0 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -117,6 +117,8 @@ message DeviceInfoProto { int64 customerIdLSB = 11; string powerMode = 12; int64 edrxCycle = 13; + int64 psmActivityTimer = 14; + int64 pagingTransmissionWindow = 15; } /** diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 16d3aba7d6..59aae12777 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -234,7 +234,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { TbCoapClientState clientState = null; try { clientState = clients.getOrCreateClient(type, deviceCredentials, deviceProfile); - clientState.updateLastUplinkTime(); + clients.awake(clientState); switch (type) { case POST_ATTRIBUTES_REQUEST: handlePostAttributesRequest(clientState, exchange, request); diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/TbCoapMessageObserver.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/TbCoapMessageObserver.java index c0ed1422c7..b704dfd8ae 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/TbCoapMessageObserver.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/TbCoapMessageObserver.java @@ -27,6 +27,7 @@ public class TbCoapMessageObserver implements MessageObserver { private final int msgId; private final Consumer onAcknowledge; + private final Consumer onTimeout; @Override public void onRetransmission() { @@ -50,7 +51,9 @@ public class TbCoapMessageObserver implements MessageObserver { @Override public void onTimeout() { - + if (onTimeout != null) { + onTimeout.accept(msgId); + } } @Override diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/CoapClientContext.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/CoapClientContext.java index bbd1dc2c76..4ed2b41f1a 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/CoapClientContext.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/CoapClientContext.java @@ -46,4 +46,6 @@ public interface CoapClientContext { void registerObserveRelation(String token, ObserveRelation relation); void deregisterObserveRelation(String token); + + boolean awake(TbCoapClientState client); } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java index b76eced709..07503af03a 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java @@ -22,8 +22,13 @@ import org.eclipse.californium.core.coap.Response; import org.eclipse.californium.core.observe.ObserveRelation; import org.eclipse.californium.core.server.resources.CoapExchange; import org.springframework.stereotype.Service; +import org.thingsboard.server.coapserver.CoapServerContext; import org.thingsboard.server.coapserver.TbCoapServerComponent; +import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.device.data.PowerMode; +import org.thingsboard.server.common.data.device.data.PowerSavingConfiguration; import org.thingsboard.server.common.data.device.profile.CoapDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.CoapDeviceTypeConfiguration; import org.thingsboard.server.common.data.device.profile.DefaultCoapDeviceTypeConfiguration; @@ -33,9 +38,11 @@ import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadCon import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.SessionMsgListener; +import org.thingsboard.server.common.transport.TransportDeviceProfileCache; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.adaptor.AdaptorException; @@ -50,9 +57,11 @@ import org.thingsboard.server.transport.coap.callback.AbstractSyncSessionCallbac import org.thingsboard.server.transport.coap.callback.CoapNoOpCallback; import org.thingsboard.server.transport.coap.callback.CoapOkCallback; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -66,8 +75,10 @@ import static org.eclipse.californium.core.coap.Message.NONE; @TbCoapServerComponent public class DefaultCoapClientContext implements CoapClientContext { + private final CoapServerContext config; private final CoapTransportContext transportContext; private final TransportService transportService; + private final TransportDeviceProfileCache profileCache; private final ConcurrentMap clients = new ConcurrentHashMap<>(); private final ConcurrentMap clientsByToken = new ConcurrentHashMap<>(); @@ -148,6 +159,65 @@ public class DefaultCoapClientContext implements CoapClientContext { } } + private void onUplink(TbCoapClientState client) { + PowerMode powerMode = client.getPowerMode(); + PowerSavingConfiguration profileSettings = null; + if (powerMode == null) { + var clientProfile = getProfile(client.getProfileId()); + if (clientProfile.isPresent()) { + profileSettings = clientProfile.get().getClientSettings(); + powerMode = profileSettings.getPowerMode(); + if (powerMode == null) { + powerMode = PowerMode.DRX; + } + } + } + if (PowerMode.DRX.equals(powerMode)) { + client.updateLastUplinkTime(); + return; + } + client.lock(); + try { + long uplinkTime = client.updateLastUplinkTime(); + long timeout; + if (PowerMode.PSM.equals(powerMode)) { + Long psmActivityTimer = client.getPsmActivityTimer(); + if (psmActivityTimer == null && profileSettings != null) { + psmActivityTimer = profileSettings.getPsmActivityTimer(); + + } + if (psmActivityTimer == null || psmActivityTimer == 0L) { + psmActivityTimer = config.getPsmActivityTimer(); + } + + timeout = psmActivityTimer; + } else { + Long pagingTransmissionWindow = client.getPagingTransmissionWindow(); + if (pagingTransmissionWindow == null && profileSettings != null) { + pagingTransmissionWindow = profileSettings.getPagingTransmissionWindow(); + + } + if (pagingTransmissionWindow == null || pagingTransmissionWindow == 0L) { + pagingTransmissionWindow = config.getPagingTransmissionWindow(); + } + timeout = pagingTransmissionWindow; + } + Future sleepTask = client.getSleepTask(); + if (sleepTask != null) { + sleepTask.cancel(false); + } + Future task = transportContext.getScheduler().schedule(() -> { + if (uplinkTime == client.getLastUplinkTime()) { + asleep(client); + } + return null; + }, timeout, TimeUnit.MILLISECONDS); + client.setSleepTask(task); + } finally { + client.unlock(); + } + } + private boolean registerFeatureObservation(TbCoapClientState state, String token, CoapExchange exchange, FeatureType featureType) { state.lock(); try { @@ -182,7 +252,9 @@ public class DefaultCoapClientContext implements CoapClientContext { if (state.getSession() == null) { TransportProtos.SessionInfoProto session = SessionInfoCreator.create(state.getCredentials(), transportContext, UUID.randomUUID()); state.setSession(session); - transportService.registerAsyncSession(session, new CoapSessionListener(state)); + CoapSessionListener listener = new CoapSessionListener(state); + state.setListener(listener); + transportService.registerAsyncSession(session, state.getListener()); transportService.process(session, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); } if (FeatureType.ATTRIBUTES.equals(featureType)) { @@ -261,7 +333,7 @@ public class DefaultCoapClientContext implements CoapClientContext { state.setAdaptor(getCoapTransportAdaptor(state.getConfiguration().isJsonPayload())); } if (state.getCredentials() == null) { - state.setCredentials(deviceCredentials); + state.init(deviceCredentials); } } finally { state.unlock(); @@ -278,10 +350,6 @@ public class DefaultCoapClientContext implements CoapClientContext { return clients.computeIfAbsent(deviceId, TbCoapClientState::new); } - private static DeviceId toDeviceId(TransportProtos.SessionInfoProto s) { - return new DeviceId(new UUID(s.getDeviceIdMSB(), s.getDeviceIdLSB())); - } - private static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) { return TransportProtos.SessionEventMsg.newBuilder() .setSessionType(TransportProtos.SessionType.ASYNC) @@ -331,7 +399,7 @@ public class DefaultCoapClientContext implements CoapClientContext { } @RequiredArgsConstructor - private class CoapSessionListener implements SessionMsgListener { + public class CoapSessionListener implements SessionMsgListener { private final TbCoapClientState state; @@ -355,13 +423,28 @@ public class DefaultCoapClientContext implements CoapClientContext { @Override public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg msg) { + if (!isDownlinkAllowed(state)) { + log.trace("[{}] ignore downlink request cause client is sleeping.", state.getDeviceId()); + state.lock(); + try { + state.addQueuedNotification(msg); + } finally { + state.unlock(); + } + return; + } log.trace("[{}] Received attributes update notification to device", sessionId); TbCoapObservationState attrs = state.getAttrs(); if (attrs != null) { try { boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getAttrs()); + int requestId = getNextMsgId(); Response response = state.getAdaptor().convertToPublish(conRequest, msg); + response.setMID(requestId); attrs.getExchange().respond(response); + if (conRequest) { + response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state))); + } } catch (AdaptorException e) { log.trace("[{}] Failed to reply due to error", state.getDeviceId(), e); cancelObserveRelation(attrs); @@ -372,6 +455,17 @@ public class DefaultCoapClientContext implements CoapClientContext { } } + @Override + public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional deviceProfileOpt) { + state.onDeviceUpdate(device); + } + + @Override + public void onDeviceDeleted(DeviceId deviceId) { + cancelRpcSubscription(state); + cancelAttributeSubscription(state); + } + @Override public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); @@ -382,6 +476,10 @@ public class DefaultCoapClientContext implements CoapClientContext { @Override public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) { log.trace("[{}] Received RPC command to device", sessionId); + if (!isDownlinkAllowed(state)) { + log.trace("[{}] ignore downlink request cause client is sleeping.", state.getDeviceId()); + return; + } boolean sent = false; boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getRpc()); try { @@ -401,7 +499,10 @@ public class DefaultCoapClientContext implements CoapClientContext { if (rpcRequestMsg != null) { transportService.process(state.getSession(), rpcRequestMsg, false, TransportServiceCallback.EMPTY); } - })); + }, null)); + } + if (conRequest) { + response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state))); } state.getRpc().getExchange().respond(response); sent = true; @@ -428,6 +529,143 @@ public class DefaultCoapClientContext implements CoapClientContext { } } + private boolean asleep(TbCoapClientState client) { + boolean changed = compareAndSetSleepFlag(client, true); + if (changed) { + log.debug("[{}] client is sleeping", client.getDeviceId()); + transportService.log(client.getSession(), "Info: Client is sleeping!"); + } + return changed; + } + + @Override + public boolean awake(TbCoapClientState client) { + onUplink(client); + boolean changed = compareAndSetSleepFlag(client, false); + if (changed) { + log.debug("[{}] client is awake", client.getDeviceId()); + transportService.log(client.getSession(), "Info: Client is awake!"); + sendMsgsAfterSleeping(client); + } + return changed; + } + + private void sendMsgsAfterSleeping(TbCoapClientState client) { + if (client.getRpc() != null) { + TransportProtos.TransportToDeviceActorMsg persistentRpcRequestMsg = TransportProtos.TransportToDeviceActorMsg + .newBuilder() + .setSessionInfo(client.getSession()) + .setSendPendingRPC(TransportProtos.SendPendingRPCMsg.newBuilder().build()) + .build(); + transportService.process(persistentRpcRequestMsg, TransportServiceCallback.EMPTY); + } + if (client.getAttrs() != null && client.getMissedAttributeUpdates() != null) { + client.getListener().onAttributeUpdate(new UUID(client.getSession().getSessionIdMSB(), client.getSession().getSessionIdLSB()), client.getAndClearMissedUpdates()); + } + } + + private boolean compareAndSetSleepFlag(TbCoapClientState client, boolean sleeping) { + if (sleeping == client.isAsleep()) { + log.trace("[{}] Client is already at sleeping: {}, ignoring event: {}", client.getDeviceId(), client.isAsleep(), sleeping); + return false; + } + client.lock(); + try { + if (sleeping == client.isAsleep()) { + log.trace("[{}] Client is already at sleeping: {}, ignoring event: {}", client.getDeviceId(), client.isAsleep(), sleeping); + return false; + } else { + PowerMode powerMode = getPowerMode(client); + if (PowerMode.PSM.equals(powerMode) || PowerMode.E_DRX.equals(powerMode)) { + log.trace("[{}] Switch sleeping from: {} to: {}", client.getDeviceId(), client.isAsleep(), sleeping); + client.setAsleep(sleeping); + // TODO: persist changes. + // update(client); + return true; + } else { + return false; + } + } + } finally { + client.unlock(); + } + } + + private boolean isDownlinkAllowed(TbCoapClientState client) { + PowerMode powerMode = client.getPowerMode(); + PowerSavingConfiguration profileSettings = null; + if (powerMode == null) { + var clientProfile = getProfile(client.getProfileId()); + if (clientProfile.isPresent()) { + profileSettings = clientProfile.get().getClientSettings(); + powerMode = profileSettings.getPowerMode(); + if (powerMode == null) { + powerMode = PowerMode.DRX; + } + } + } + if (PowerMode.DRX.equals(powerMode)) { + return true; + } + client.lock(); + long timeSinceLastUplink = System.currentTimeMillis() - client.getLastUplinkTime(); + try { + if (PowerMode.PSM.equals(powerMode)) { + Long psmActivityTimer = client.getPsmActivityTimer(); + if (psmActivityTimer == null && profileSettings != null) { + psmActivityTimer = profileSettings.getPsmActivityTimer(); + + } + if (psmActivityTimer == null || psmActivityTimer == 0L) { + psmActivityTimer = config.getPsmActivityTimer(); + } + return timeSinceLastUplink <= psmActivityTimer; + } else { + Long pagingTransmissionWindow = client.getPagingTransmissionWindow(); + if (pagingTransmissionWindow == null && profileSettings != null) { + pagingTransmissionWindow = profileSettings.getPagingTransmissionWindow(); + + } + if (pagingTransmissionWindow == null || pagingTransmissionWindow == 0L) { + pagingTransmissionWindow = config.getPagingTransmissionWindow(); + } + boolean allowed = timeSinceLastUplink <= pagingTransmissionWindow; + if (!allowed) { + return client.checkFirstDownlink(); + } else { + return true; + } + } + } finally { + client.unlock(); + } + } + + private PowerMode getPowerMode(TbCoapClientState client) { + PowerMode powerMode = client.getPowerMode(); + if (powerMode == null) { + Optional deviceProfile = getProfile(client.getProfileId()); + if (deviceProfile.isPresent()) { + powerMode = deviceProfile.get().getClientSettings().getPowerMode(); + } else { + powerMode = PowerMode.PSM; + } + } + return powerMode; + } + + public Optional getProfile(DeviceProfileId profileId) { + DeviceProfile deviceProfile = profileCache.get(profileId); + if (deviceProfile.getTransportType().equals(DeviceTransportType.COAP)) { + return Optional.of((CoapDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration()); + } else if (deviceProfile.getTransportType().equals(DeviceTransportType.DEFAULT)) { + return Optional.empty(); + } else { + log.warn("[{}] Invalid device profile type: {}", profileId, deviceProfile.getTransportType()); + throw new IllegalArgumentException("Invalid device profile type: " + deviceProfile.getTransportType()); + } + } + protected int getNextMsgId() { return ThreadLocalRandom.current().nextInt(NONE, MAX_MID + 1); } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/TbCoapClientState.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/TbCoapClientState.java index 9b6c7c6d9d..9861b4f783 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/TbCoapClientState.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/TbCoapClientState.java @@ -18,12 +18,26 @@ package org.thingsboard.server.transport.coap.client; import lombok.Data; import lombok.Getter; import lombok.Setter; +import org.eclipse.leshan.server.registration.Registration; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.device.data.CoapDeviceTransportConfiguration; +import org.thingsboard.server.common.data.device.data.PowerMode; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.TransportConfigurationContainer; import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -37,12 +51,22 @@ public class TbCoapClientState { private volatile TransportConfigurationContainer configuration; private volatile CoapTransportAdaptor adaptor; private volatile ValidateDeviceCredentialsResponse credentials; - private volatile TransportProtos.SessionInfoProto session; - + private volatile DefaultCoapClientContext.CoapSessionListener listener; private volatile TbCoapObservationState attrs; private volatile TbCoapObservationState rpc; + private TransportProtos.AttributeUpdateNotificationMsg missedAttributeUpdates; + private DeviceProfileId profileId; + + @Getter + private PowerMode powerMode; + @Getter + private Long psmActivityTimer; + @Getter + private Long edrxCycle; + @Getter + private Long pagingTransmissionWindow; @Getter @Setter private boolean asleep; @@ -59,6 +83,14 @@ public class TbCoapClientState { this.lock = new ReentrantLock(); } + public void init(ValidateDeviceCredentialsResponse credentials) { + this.profileId = credentials.getDeviceInfo().getDeviceProfileId(); + this.powerMode = credentials.getDeviceInfo().getPowerMode(); + this.edrxCycle = credentials.getDeviceInfo().getEdrxCycle(); + this.psmActivityTimer = credentials.getDeviceInfo().getPsmActivityTimer(); + this.pagingTransmissionWindow = credentials.getDeviceInfo().getPagingTransmissionWindow(); + } + public void lock() { lock.lock(); } @@ -67,10 +99,54 @@ public class TbCoapClientState { lock.unlock(); } - public long updateLastUplinkTime(){ + public long updateLastUplinkTime() { this.lastUplinkTime = System.currentTimeMillis(); this.firstEdrxDownlink = true; return lastUplinkTime; } + public boolean checkFirstDownlink() { + boolean result = firstEdrxDownlink; + firstEdrxDownlink = false; + return result; + } + + public void onDeviceUpdate(Device device) { + this.profileId = device.getDeviceProfileId(); + var data = device.getDeviceData(); + if (data.getTransportConfiguration() != null && data.getTransportConfiguration().getType().equals(DeviceTransportType.COAP)) { + CoapDeviceTransportConfiguration configuration = (CoapDeviceTransportConfiguration) data.getTransportConfiguration(); + this.powerMode = configuration.getPowerMode(); + this.edrxCycle = configuration.getEdrxCycle(); + this.psmActivityTimer = configuration.getPsmActivityTimer(); + this.pagingTransmissionWindow = configuration.getPagingTransmissionWindow(); + } + } + + public void addQueuedNotification(TransportProtos.AttributeUpdateNotificationMsg msg) { + if (missedAttributeUpdates == null) { + missedAttributeUpdates = msg; + } else { + Map updatedAttrs = new HashMap<>(missedAttributeUpdates.getSharedUpdatedCount() + msg.getSharedUpdatedCount()); + Set deletedKeys = new HashSet<>(missedAttributeUpdates.getSharedDeletedCount() + msg.getSharedDeletedCount()); + for (TransportProtos.TsKvProto oldUpdatedAttrs : missedAttributeUpdates.getSharedUpdatedList()) { + updatedAttrs.put(oldUpdatedAttrs.getKv().getKey(), oldUpdatedAttrs); + } + deletedKeys.addAll(msg.getSharedDeletedList()); + for (TransportProtos.TsKvProto newUpdatedAttrs : msg.getSharedUpdatedList()) { + updatedAttrs.put(newUpdatedAttrs.getKv().getKey(), newUpdatedAttrs); + } + deletedKeys.addAll(msg.getSharedDeletedList()); + for (String deletedKey : msg.getSharedDeletedList()) { + updatedAttrs.remove(deletedKey); + } + missedAttributeUpdates = TransportProtos.AttributeUpdateNotificationMsg.newBuilder().addAllSharedUpdated(updatedAttrs.values()).addAllSharedDeleted(deletedKeys).build(); + } + } + + public TransportProtos.AttributeUpdateNotificationMsg getAndClearMissedUpdates() { + var result = this.missedAttributeUpdates; + this.missedAttributeUpdates = null; + return result; + } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index a3826f5844..9aff28d73f 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -126,4 +126,6 @@ public interface TransportService { SessionMetaData reportActivity(SessionInfoProto sessionInfo); void deregisterSession(SessionInfoProto sessionInfo); + + void log(SessionInfoProto sessionInfo, String msg); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/TransportDeviceInfo.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/TransportDeviceInfo.java index 92bd389f60..af44cab01d 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/TransportDeviceInfo.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/TransportDeviceInfo.java @@ -36,4 +36,6 @@ public class TransportDeviceInfo implements Serializable { private PowerMode powerMode; private String additionalInfo; private Long edrxCycle; + private Long psmActivityTimer; + private Long pagingTransmissionWindow; } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 1a46f61fe9..a1503e86be 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -119,6 +119,12 @@ public class DefaultTransportService implements TransportService { public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime"; + private final AtomicInteger atomicTs = new AtomicInteger(0); + + @Value("${transport.log.enabled:true}") + private boolean logEnabled; + @Value("${transport.log.max_length:1024}") + private int logMaxLength; @Value("${transport.sessions.inactivity_timeout}") private long sessionInactivityTimeout; @Value("${transport.sessions.report_timeout}") @@ -444,6 +450,8 @@ public class DefaultTransportService implements TransportService { if (StringUtils.isNotEmpty(di.getPowerMode())) { tdi.setPowerMode(PowerMode.valueOf(di.getPowerMode())); tdi.setEdrxCycle(di.getEdrxCycle()); + tdi.setPsmActivityTimer(di.getPsmActivityTimer()); + tdi.setPagingTransmissionWindow(di.getPagingTransmissionWindow()); } return tdi; } @@ -745,6 +753,26 @@ public class DefaultTransportService implements TransportService { sessions.remove(toSessionId(sessionInfo)); } + @Override + public void log(TransportProtos.SessionInfoProto sessionInfo, String msg) { + if (!logEnabled || sessionInfo == null || StringUtils.isEmpty(msg)) { + return; + } + if (msg.length() > logMaxLength) { + msg = msg.substring(0, logMaxLength); + } + TransportProtos.PostTelemetryMsg.Builder request = TransportProtos.PostTelemetryMsg.newBuilder(); + TransportProtos.TsKvListProto.Builder builder = TransportProtos.TsKvListProto.newBuilder(); + builder.setTs(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) * 1000L + (atomicTs.getAndIncrement() % 1000)); + builder.addKv(TransportProtos.KeyValueProto.newBuilder() + .setKey("transportLog") + .setType(TransportProtos.KeyValueType.STRING_V) + .setStringV(msg).build()); + request.addTsKvList(builder.build()); + TransportProtos.PostTelemetryMsg postTelemetryMsg = request.build(); + process(sessionInfo, postTelemetryMsg, TransportServiceCallback.EMPTY); + } + private boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback callback) { return checkLimits(sessionInfo, msg, callback, 0); } diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 1033301b5b..deea1934db 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -89,6 +89,8 @@ transport: bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}" bind_port: "${COAP_BIND_PORT:5683}" timeout: "${COAP_TIMEOUT:10000}" + psm_activity_timer: "${COAP_PSM_ACTIVITY_TIMER:10000}" + paging_transmission_window: "${COAP_PAGING_TRANSMISSION_WINDOW:10000}" dtls: # Enable/disable DTLS 1.2 support enabled: "${COAP_DTLS_ENABLED:false}" @@ -117,6 +119,9 @@ transport: type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}" # Maximum allowed string value length when processing Telemetry/Attributes JSON (0 value disables string value length check) max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" + log: + enabled: "${TB_TRANSPORT_LOG_ENABLED:true}" + max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}" queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index e12c5308ba..b143f2fcd6 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -91,6 +91,9 @@ transport: type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}" # Maximum allowed string value length when processing Telemetry/Attributes JSON (0 value disables string value length check) max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" + log: + enabled: "${TB_TRANSPORT_LOG_ENABLED:true}" + max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}" queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index b6f29ea9d7..e65fd0da04 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -98,6 +98,9 @@ transport: # Enable/disable http/mqtt/coap transport protocols (has higher priority than certain protocol's 'enabled' property) api_enabled: "${TB_TRANSPORT_API_ENABLED:true}" # Local LwM2M transport parameters + log: + enabled: "${TB_TRANSPORT_LOG_ENABLED:true}" + max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}" lwm2m: # Enable/disable lvm2m transport protocol. enabled: "${LWM2M_ENABLED:true}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index f748bd872d..c678d5afed 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -123,6 +123,10 @@ transport: type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}" # Maximum allowed string value length when processing Telemetry/Attributes JSON (0 value disables string value length check) max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" + log: + enabled: "${TB_TRANSPORT_LOG_ENABLED:true}" + max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}" + queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index cb6e7d5953..a7f1879947 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -56,6 +56,9 @@ transport: type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}" # Maximum allowed string value length when processing Telemetry/Attributes JSON (0 value disables string value length check) max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" + log: + enabled: "${TB_TRANSPORT_LOG_ENABLED:true}" + max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}" queue: type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)