PSM and eDRX implementation draft
This commit is contained in:
parent
4364755e11
commit
3a7a97dd91
@ -228,57 +228,57 @@ cassandra:
|
||||
|
||||
# SQL configuration parameters
|
||||
sql:
|
||||
# Specify batch size for persisting attribute updates
|
||||
attributes:
|
||||
batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}"
|
||||
batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}"
|
||||
stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}"
|
||||
batch_threads: "${SQL_ATTRIBUTES_BATCH_THREADS:4}"
|
||||
ts:
|
||||
batch_size: "${SQL_TS_BATCH_SIZE:10000}"
|
||||
batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}"
|
||||
stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}"
|
||||
batch_threads: "${SQL_TS_BATCH_THREADS:4}"
|
||||
ts_latest:
|
||||
batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}"
|
||||
batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}"
|
||||
stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}"
|
||||
batch_threads: "${SQL_TS_LATEST_BATCH_THREADS:4}"
|
||||
update_by_latest_ts: "${SQL_TS_UPDATE_BY_LATEST_TIMESTAMP:true}"
|
||||
# Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks
|
||||
batch_sort: "${SQL_BATCH_SORT:false}"
|
||||
# Specify whether to remove null characters from strValue of attributes and timeseries before insert
|
||||
remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}"
|
||||
# Specify batch size for persisting attribute updates
|
||||
attributes:
|
||||
batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}"
|
||||
batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}"
|
||||
stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}"
|
||||
batch_threads: "${SQL_ATTRIBUTES_BATCH_THREADS:4}"
|
||||
ts:
|
||||
batch_size: "${SQL_TS_BATCH_SIZE:10000}"
|
||||
batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}"
|
||||
stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}"
|
||||
batch_threads: "${SQL_TS_BATCH_THREADS:4}"
|
||||
ts_latest:
|
||||
batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}"
|
||||
batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}"
|
||||
stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}"
|
||||
batch_threads: "${SQL_TS_LATEST_BATCH_THREADS:4}"
|
||||
update_by_latest_ts: "${SQL_TS_UPDATE_BY_LATEST_TIMESTAMP:true}"
|
||||
# Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks
|
||||
batch_sort: "${SQL_BATCH_SORT:false}"
|
||||
# Specify whether to remove null characters from strValue of attributes and timeseries before insert
|
||||
remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}"
|
||||
# Specify whether to log database queries and their parameters generated by entity query repository
|
||||
log_queries: "${SQL_LOG_QUERIES:false}"
|
||||
log_queries_threshold: "${SQL_LOG_QUERIES_THRESHOLD:5000}"
|
||||
postgres:
|
||||
# Specify partitioning size for timestamp key-value storage. Example: DAYS, MONTHS, YEARS, INDEFINITE.
|
||||
ts_key_value_partitioning: "${SQL_POSTGRES_TS_KV_PARTITIONING:MONTHS}"
|
||||
timescale:
|
||||
# Specify Interval size for new data chunks storage.
|
||||
chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}"
|
||||
batch_threads: "${SQL_TIMESCALE_BATCH_THREADS:4}"
|
||||
ttl:
|
||||
ts:
|
||||
enabled: "${SQL_TTL_TS_ENABLED:true}"
|
||||
execution_interval_ms: "${SQL_TTL_TS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day
|
||||
ts_key_value_ttl: "${SQL_TTL_TS_TS_KEY_VALUE_TTL:0}" # Number of seconds
|
||||
events:
|
||||
enabled: "${SQL_TTL_EVENTS_ENABLED:true}"
|
||||
execution_interval_ms: "${SQL_TTL_EVENTS_EXECUTION_INTERVAL:2220000}" # Number of milliseconds (max random initial delay and fixed period). # 37minutes to avoid common interval spikes
|
||||
events_ttl: "${SQL_TTL_EVENTS_EVENTS_TTL:0}" # Number of seconds
|
||||
debug_events_ttl: "${SQL_TTL_EVENTS_DEBUG_EVENTS_TTL:604800}" # Number of seconds. The current value corresponds to one week
|
||||
edge_events:
|
||||
enabled: "${SQL_TTL_EDGE_EVENTS_ENABLED:true}"
|
||||
execution_interval_ms: "${SQL_TTL_EDGE_EVENTS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day
|
||||
edge_events_ttl: "${SQL_TTL_EDGE_EVENTS_TTL:2628000}" # Number of seconds. The current value corresponds to one month
|
||||
alarms:
|
||||
checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
|
||||
removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches
|
||||
rpc:
|
||||
enabled: "${SQL_TTL_RPC_ENABLED:true}"
|
||||
checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
|
||||
log_queries: "${SQL_LOG_QUERIES:false}"
|
||||
log_queries_threshold: "${SQL_LOG_QUERIES_THRESHOLD:5000}"
|
||||
postgres:
|
||||
# Specify partitioning size for timestamp key-value storage. Example: DAYS, MONTHS, YEARS, INDEFINITE.
|
||||
ts_key_value_partitioning: "${SQL_POSTGRES_TS_KV_PARTITIONING:MONTHS}"
|
||||
timescale:
|
||||
# Specify Interval size for new data chunks storage.
|
||||
chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}"
|
||||
batch_threads: "${SQL_TIMESCALE_BATCH_THREADS:4}"
|
||||
ttl:
|
||||
ts:
|
||||
enabled: "${SQL_TTL_TS_ENABLED:true}"
|
||||
execution_interval_ms: "${SQL_TTL_TS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day
|
||||
ts_key_value_ttl: "${SQL_TTL_TS_TS_KEY_VALUE_TTL:0}" # Number of seconds
|
||||
events:
|
||||
enabled: "${SQL_TTL_EVENTS_ENABLED:true}"
|
||||
execution_interval_ms: "${SQL_TTL_EVENTS_EXECUTION_INTERVAL:2220000}" # Number of milliseconds (max random initial delay and fixed period). # 37minutes to avoid common interval spikes
|
||||
events_ttl: "${SQL_TTL_EVENTS_EVENTS_TTL:0}" # Number of seconds
|
||||
debug_events_ttl: "${SQL_TTL_EVENTS_DEBUG_EVENTS_TTL:604800}" # Number of seconds. The current value corresponds to one week
|
||||
edge_events:
|
||||
enabled: "${SQL_TTL_EDGE_EVENTS_ENABLED:true}"
|
||||
execution_interval_ms: "${SQL_TTL_EDGE_EVENTS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day
|
||||
edge_events_ttl: "${SQL_TTL_EDGE_EVENTS_TTL:2628000}" # Number of seconds. The current value corresponds to one month
|
||||
alarms:
|
||||
checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
|
||||
removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches
|
||||
rpc:
|
||||
enabled: "${SQL_TTL_RPC_ENABLED:true}"
|
||||
checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
|
||||
|
||||
# Actor system parameters
|
||||
actors:
|
||||
@ -573,6 +573,9 @@ transport:
|
||||
timeout: "${CLIENT_SIDE_RPC_TIMEOUT:60000}"
|
||||
# Enable/disable http/mqtt/coap transport protocols (has higher priority than certain protocol's 'enabled' property)
|
||||
api_enabled: "${TB_TRANSPORT_API_ENABLED:true}"
|
||||
log:
|
||||
enabled: "${TB_TRANSPORT_LOG_ENABLED:true}"
|
||||
max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}"
|
||||
# Local HTTP transport parameters
|
||||
http:
|
||||
enabled: "${HTTP_ENABLED:true}"
|
||||
@ -618,6 +621,8 @@ transport:
|
||||
bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
|
||||
bind_port: "${COAP_BIND_PORT:5683}"
|
||||
timeout: "${COAP_TIMEOUT:10000}"
|
||||
psm_activity_timer: "${COAP_PSM_ACTIVITY_TIMER:10000}"
|
||||
paging_transmission_window: "${COAP_PAGING_TRANSMISSION_WINDOW:10000}"
|
||||
dtls:
|
||||
# Enable/disable DTLS 1.2 support
|
||||
enabled: "${COAP_DTLS_ENABLED:false}"
|
||||
|
||||
@ -38,6 +38,14 @@ public class CoapServerContext {
|
||||
@Value("${transport.coap.timeout}")
|
||||
private Long timeout;
|
||||
|
||||
@Getter
|
||||
@Value("${transport.coap.psm_activity_timer:10000}")
|
||||
private long psmActivityTimer;
|
||||
|
||||
@Getter
|
||||
@Value("${transport.coap.paging_transmission_window:10000}")
|
||||
private long pagingTransmissionWindow;
|
||||
|
||||
@Getter
|
||||
@Autowired(required = false)
|
||||
private TbCoapDtlsSettings dtlsSettings;
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.common.data.device.data.lwm2m;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.data.device.data.PowerMode;
|
||||
import org.thingsboard.server.common.data.device.data.PowerSavingConfiguration;
|
||||
|
||||
@Data
|
||||
@ -26,6 +27,10 @@ public class OtherConfiguration extends PowerSavingConfiguration {
|
||||
private Integer fwUpdateStrategy;
|
||||
private Integer swUpdateStrategy;
|
||||
private Integer clientOnlyObserveAfterConnect;
|
||||
private PowerMode powerMode;
|
||||
private Long psmActivityTimer;
|
||||
private Long edrxCycle;
|
||||
private Long pagingTransmissionWindow;
|
||||
private String fwUpdateResource;
|
||||
private String swUpdateResource;
|
||||
private boolean compositeOperationsSupport;
|
||||
|
||||
@ -117,6 +117,8 @@ message DeviceInfoProto {
|
||||
int64 customerIdLSB = 11;
|
||||
string powerMode = 12;
|
||||
int64 edrxCycle = 13;
|
||||
int64 psmActivityTimer = 14;
|
||||
int64 pagingTransmissionWindow = 15;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -234,7 +234,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
|
||||
TbCoapClientState clientState = null;
|
||||
try {
|
||||
clientState = clients.getOrCreateClient(type, deviceCredentials, deviceProfile);
|
||||
clientState.updateLastUplinkTime();
|
||||
clients.awake(clientState);
|
||||
switch (type) {
|
||||
case POST_ATTRIBUTES_REQUEST:
|
||||
handlePostAttributesRequest(clientState, exchange, request);
|
||||
|
||||
@ -27,6 +27,7 @@ public class TbCoapMessageObserver implements MessageObserver {
|
||||
|
||||
private final int msgId;
|
||||
private final Consumer<Integer> onAcknowledge;
|
||||
private final Consumer<Integer> onTimeout;
|
||||
|
||||
@Override
|
||||
public void onRetransmission() {
|
||||
@ -50,7 +51,9 @@ public class TbCoapMessageObserver implements MessageObserver {
|
||||
|
||||
@Override
|
||||
public void onTimeout() {
|
||||
|
||||
if (onTimeout != null) {
|
||||
onTimeout.accept(msgId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -46,4 +46,6 @@ public interface CoapClientContext {
|
||||
void registerObserveRelation(String token, ObserveRelation relation);
|
||||
|
||||
void deregisterObserveRelation(String token);
|
||||
|
||||
boolean awake(TbCoapClientState client);
|
||||
}
|
||||
|
||||
@ -22,8 +22,13 @@ import org.eclipse.californium.core.coap.Response;
|
||||
import org.eclipse.californium.core.observe.ObserveRelation;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.coapserver.CoapServerContext;
|
||||
import org.thingsboard.server.coapserver.TbCoapServerComponent;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.DeviceTransportType;
|
||||
import org.thingsboard.server.common.data.device.data.PowerMode;
|
||||
import org.thingsboard.server.common.data.device.data.PowerSavingConfiguration;
|
||||
import org.thingsboard.server.common.data.device.profile.CoapDeviceProfileTransportConfiguration;
|
||||
import org.thingsboard.server.common.data.device.profile.CoapDeviceTypeConfiguration;
|
||||
import org.thingsboard.server.common.data.device.profile.DefaultCoapDeviceTypeConfiguration;
|
||||
@ -33,9 +38,11 @@ import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadCon
|
||||
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
|
||||
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.msg.session.FeatureType;
|
||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
||||
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||
import org.thingsboard.server.common.transport.TransportDeviceProfileCache;
|
||||
import org.thingsboard.server.common.transport.TransportService;
|
||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
|
||||
@ -50,9 +57,11 @@ import org.thingsboard.server.transport.coap.callback.AbstractSyncSessionCallbac
|
||||
import org.thingsboard.server.transport.coap.callback.CoapNoOpCallback;
|
||||
import org.thingsboard.server.transport.coap.callback.CoapOkCallback;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@ -66,8 +75,10 @@ import static org.eclipse.californium.core.coap.Message.NONE;
|
||||
@TbCoapServerComponent
|
||||
public class DefaultCoapClientContext implements CoapClientContext {
|
||||
|
||||
private final CoapServerContext config;
|
||||
private final CoapTransportContext transportContext;
|
||||
private final TransportService transportService;
|
||||
private final TransportDeviceProfileCache profileCache;
|
||||
private final ConcurrentMap<DeviceId, TbCoapClientState> clients = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<String, TbCoapClientState> clientsByToken = new ConcurrentHashMap<>();
|
||||
|
||||
@ -148,6 +159,65 @@ public class DefaultCoapClientContext implements CoapClientContext {
|
||||
}
|
||||
}
|
||||
|
||||
private void onUplink(TbCoapClientState client) {
|
||||
PowerMode powerMode = client.getPowerMode();
|
||||
PowerSavingConfiguration profileSettings = null;
|
||||
if (powerMode == null) {
|
||||
var clientProfile = getProfile(client.getProfileId());
|
||||
if (clientProfile.isPresent()) {
|
||||
profileSettings = clientProfile.get().getClientSettings();
|
||||
powerMode = profileSettings.getPowerMode();
|
||||
if (powerMode == null) {
|
||||
powerMode = PowerMode.DRX;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (PowerMode.DRX.equals(powerMode)) {
|
||||
client.updateLastUplinkTime();
|
||||
return;
|
||||
}
|
||||
client.lock();
|
||||
try {
|
||||
long uplinkTime = client.updateLastUplinkTime();
|
||||
long timeout;
|
||||
if (PowerMode.PSM.equals(powerMode)) {
|
||||
Long psmActivityTimer = client.getPsmActivityTimer();
|
||||
if (psmActivityTimer == null && profileSettings != null) {
|
||||
psmActivityTimer = profileSettings.getPsmActivityTimer();
|
||||
|
||||
}
|
||||
if (psmActivityTimer == null || psmActivityTimer == 0L) {
|
||||
psmActivityTimer = config.getPsmActivityTimer();
|
||||
}
|
||||
|
||||
timeout = psmActivityTimer;
|
||||
} else {
|
||||
Long pagingTransmissionWindow = client.getPagingTransmissionWindow();
|
||||
if (pagingTransmissionWindow == null && profileSettings != null) {
|
||||
pagingTransmissionWindow = profileSettings.getPagingTransmissionWindow();
|
||||
|
||||
}
|
||||
if (pagingTransmissionWindow == null || pagingTransmissionWindow == 0L) {
|
||||
pagingTransmissionWindow = config.getPagingTransmissionWindow();
|
||||
}
|
||||
timeout = pagingTransmissionWindow;
|
||||
}
|
||||
Future<Void> sleepTask = client.getSleepTask();
|
||||
if (sleepTask != null) {
|
||||
sleepTask.cancel(false);
|
||||
}
|
||||
Future<Void> task = transportContext.getScheduler().schedule(() -> {
|
||||
if (uplinkTime == client.getLastUplinkTime()) {
|
||||
asleep(client);
|
||||
}
|
||||
return null;
|
||||
}, timeout, TimeUnit.MILLISECONDS);
|
||||
client.setSleepTask(task);
|
||||
} finally {
|
||||
client.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean registerFeatureObservation(TbCoapClientState state, String token, CoapExchange exchange, FeatureType featureType) {
|
||||
state.lock();
|
||||
try {
|
||||
@ -182,7 +252,9 @@ public class DefaultCoapClientContext implements CoapClientContext {
|
||||
if (state.getSession() == null) {
|
||||
TransportProtos.SessionInfoProto session = SessionInfoCreator.create(state.getCredentials(), transportContext, UUID.randomUUID());
|
||||
state.setSession(session);
|
||||
transportService.registerAsyncSession(session, new CoapSessionListener(state));
|
||||
CoapSessionListener listener = new CoapSessionListener(state);
|
||||
state.setListener(listener);
|
||||
transportService.registerAsyncSession(session, state.getListener());
|
||||
transportService.process(session, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
|
||||
}
|
||||
if (FeatureType.ATTRIBUTES.equals(featureType)) {
|
||||
@ -261,7 +333,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
|
||||
state.setAdaptor(getCoapTransportAdaptor(state.getConfiguration().isJsonPayload()));
|
||||
}
|
||||
if (state.getCredentials() == null) {
|
||||
state.setCredentials(deviceCredentials);
|
||||
state.init(deviceCredentials);
|
||||
}
|
||||
} finally {
|
||||
state.unlock();
|
||||
@ -278,10 +350,6 @@ public class DefaultCoapClientContext implements CoapClientContext {
|
||||
return clients.computeIfAbsent(deviceId, TbCoapClientState::new);
|
||||
}
|
||||
|
||||
private static DeviceId toDeviceId(TransportProtos.SessionInfoProto s) {
|
||||
return new DeviceId(new UUID(s.getDeviceIdMSB(), s.getDeviceIdLSB()));
|
||||
}
|
||||
|
||||
private static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) {
|
||||
return TransportProtos.SessionEventMsg.newBuilder()
|
||||
.setSessionType(TransportProtos.SessionType.ASYNC)
|
||||
@ -331,7 +399,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private class CoapSessionListener implements SessionMsgListener {
|
||||
public class CoapSessionListener implements SessionMsgListener {
|
||||
|
||||
private final TbCoapClientState state;
|
||||
|
||||
@ -355,13 +423,28 @@ public class DefaultCoapClientContext implements CoapClientContext {
|
||||
|
||||
@Override
|
||||
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg msg) {
|
||||
if (!isDownlinkAllowed(state)) {
|
||||
log.trace("[{}] ignore downlink request cause client is sleeping.", state.getDeviceId());
|
||||
state.lock();
|
||||
try {
|
||||
state.addQueuedNotification(msg);
|
||||
} finally {
|
||||
state.unlock();
|
||||
}
|
||||
return;
|
||||
}
|
||||
log.trace("[{}] Received attributes update notification to device", sessionId);
|
||||
TbCoapObservationState attrs = state.getAttrs();
|
||||
if (attrs != null) {
|
||||
try {
|
||||
boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getAttrs());
|
||||
int requestId = getNextMsgId();
|
||||
Response response = state.getAdaptor().convertToPublish(conRequest, msg);
|
||||
response.setMID(requestId);
|
||||
attrs.getExchange().respond(response);
|
||||
if (conRequest) {
|
||||
response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state)));
|
||||
}
|
||||
} catch (AdaptorException e) {
|
||||
log.trace("[{}] Failed to reply due to error", state.getDeviceId(), e);
|
||||
cancelObserveRelation(attrs);
|
||||
@ -372,6 +455,17 @@ public class DefaultCoapClientContext implements CoapClientContext {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
|
||||
state.onDeviceUpdate(device);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDeviceDeleted(DeviceId deviceId) {
|
||||
cancelRpcSubscription(state);
|
||||
cancelAttributeSubscription(state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
|
||||
log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
|
||||
@ -382,6 +476,10 @@ public class DefaultCoapClientContext implements CoapClientContext {
|
||||
@Override
|
||||
public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg msg) {
|
||||
log.trace("[{}] Received RPC command to device", sessionId);
|
||||
if (!isDownlinkAllowed(state)) {
|
||||
log.trace("[{}] ignore downlink request cause client is sleeping.", state.getDeviceId());
|
||||
return;
|
||||
}
|
||||
boolean sent = false;
|
||||
boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getRpc());
|
||||
try {
|
||||
@ -401,7 +499,10 @@ public class DefaultCoapClientContext implements CoapClientContext {
|
||||
if (rpcRequestMsg != null) {
|
||||
transportService.process(state.getSession(), rpcRequestMsg, false, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
}));
|
||||
}, null));
|
||||
}
|
||||
if (conRequest) {
|
||||
response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state)));
|
||||
}
|
||||
state.getRpc().getExchange().respond(response);
|
||||
sent = true;
|
||||
@ -428,6 +529,143 @@ public class DefaultCoapClientContext implements CoapClientContext {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean asleep(TbCoapClientState client) {
|
||||
boolean changed = compareAndSetSleepFlag(client, true);
|
||||
if (changed) {
|
||||
log.debug("[{}] client is sleeping", client.getDeviceId());
|
||||
transportService.log(client.getSession(), "Info: Client is sleeping!");
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean awake(TbCoapClientState client) {
|
||||
onUplink(client);
|
||||
boolean changed = compareAndSetSleepFlag(client, false);
|
||||
if (changed) {
|
||||
log.debug("[{}] client is awake", client.getDeviceId());
|
||||
transportService.log(client.getSession(), "Info: Client is awake!");
|
||||
sendMsgsAfterSleeping(client);
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
private void sendMsgsAfterSleeping(TbCoapClientState client) {
|
||||
if (client.getRpc() != null) {
|
||||
TransportProtos.TransportToDeviceActorMsg persistentRpcRequestMsg = TransportProtos.TransportToDeviceActorMsg
|
||||
.newBuilder()
|
||||
.setSessionInfo(client.getSession())
|
||||
.setSendPendingRPC(TransportProtos.SendPendingRPCMsg.newBuilder().build())
|
||||
.build();
|
||||
transportService.process(persistentRpcRequestMsg, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
if (client.getAttrs() != null && client.getMissedAttributeUpdates() != null) {
|
||||
client.getListener().onAttributeUpdate(new UUID(client.getSession().getSessionIdMSB(), client.getSession().getSessionIdLSB()), client.getAndClearMissedUpdates());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean compareAndSetSleepFlag(TbCoapClientState client, boolean sleeping) {
|
||||
if (sleeping == client.isAsleep()) {
|
||||
log.trace("[{}] Client is already at sleeping: {}, ignoring event: {}", client.getDeviceId(), client.isAsleep(), sleeping);
|
||||
return false;
|
||||
}
|
||||
client.lock();
|
||||
try {
|
||||
if (sleeping == client.isAsleep()) {
|
||||
log.trace("[{}] Client is already at sleeping: {}, ignoring event: {}", client.getDeviceId(), client.isAsleep(), sleeping);
|
||||
return false;
|
||||
} else {
|
||||
PowerMode powerMode = getPowerMode(client);
|
||||
if (PowerMode.PSM.equals(powerMode) || PowerMode.E_DRX.equals(powerMode)) {
|
||||
log.trace("[{}] Switch sleeping from: {} to: {}", client.getDeviceId(), client.isAsleep(), sleeping);
|
||||
client.setAsleep(sleeping);
|
||||
// TODO: persist changes.
|
||||
// update(client);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
client.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isDownlinkAllowed(TbCoapClientState client) {
|
||||
PowerMode powerMode = client.getPowerMode();
|
||||
PowerSavingConfiguration profileSettings = null;
|
||||
if (powerMode == null) {
|
||||
var clientProfile = getProfile(client.getProfileId());
|
||||
if (clientProfile.isPresent()) {
|
||||
profileSettings = clientProfile.get().getClientSettings();
|
||||
powerMode = profileSettings.getPowerMode();
|
||||
if (powerMode == null) {
|
||||
powerMode = PowerMode.DRX;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (PowerMode.DRX.equals(powerMode)) {
|
||||
return true;
|
||||
}
|
||||
client.lock();
|
||||
long timeSinceLastUplink = System.currentTimeMillis() - client.getLastUplinkTime();
|
||||
try {
|
||||
if (PowerMode.PSM.equals(powerMode)) {
|
||||
Long psmActivityTimer = client.getPsmActivityTimer();
|
||||
if (psmActivityTimer == null && profileSettings != null) {
|
||||
psmActivityTimer = profileSettings.getPsmActivityTimer();
|
||||
|
||||
}
|
||||
if (psmActivityTimer == null || psmActivityTimer == 0L) {
|
||||
psmActivityTimer = config.getPsmActivityTimer();
|
||||
}
|
||||
return timeSinceLastUplink <= psmActivityTimer;
|
||||
} else {
|
||||
Long pagingTransmissionWindow = client.getPagingTransmissionWindow();
|
||||
if (pagingTransmissionWindow == null && profileSettings != null) {
|
||||
pagingTransmissionWindow = profileSettings.getPagingTransmissionWindow();
|
||||
|
||||
}
|
||||
if (pagingTransmissionWindow == null || pagingTransmissionWindow == 0L) {
|
||||
pagingTransmissionWindow = config.getPagingTransmissionWindow();
|
||||
}
|
||||
boolean allowed = timeSinceLastUplink <= pagingTransmissionWindow;
|
||||
if (!allowed) {
|
||||
return client.checkFirstDownlink();
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
client.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private PowerMode getPowerMode(TbCoapClientState client) {
|
||||
PowerMode powerMode = client.getPowerMode();
|
||||
if (powerMode == null) {
|
||||
Optional<CoapDeviceProfileTransportConfiguration> deviceProfile = getProfile(client.getProfileId());
|
||||
if (deviceProfile.isPresent()) {
|
||||
powerMode = deviceProfile.get().getClientSettings().getPowerMode();
|
||||
} else {
|
||||
powerMode = PowerMode.PSM;
|
||||
}
|
||||
}
|
||||
return powerMode;
|
||||
}
|
||||
|
||||
public Optional<CoapDeviceProfileTransportConfiguration> getProfile(DeviceProfileId profileId) {
|
||||
DeviceProfile deviceProfile = profileCache.get(profileId);
|
||||
if (deviceProfile.getTransportType().equals(DeviceTransportType.COAP)) {
|
||||
return Optional.of((CoapDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration());
|
||||
} else if (deviceProfile.getTransportType().equals(DeviceTransportType.DEFAULT)) {
|
||||
return Optional.empty();
|
||||
} else {
|
||||
log.warn("[{}] Invalid device profile type: {}", profileId, deviceProfile.getTransportType());
|
||||
throw new IllegalArgumentException("Invalid device profile type: " + deviceProfile.getTransportType());
|
||||
}
|
||||
}
|
||||
|
||||
protected int getNextMsgId() {
|
||||
return ThreadLocalRandom.current().nextInt(NONE, MAX_MID + 1);
|
||||
}
|
||||
|
||||
@ -18,12 +18,26 @@ package org.thingsboard.server.transport.coap.client;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import org.eclipse.leshan.server.registration.Registration;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceTransportType;
|
||||
import org.thingsboard.server.common.data.device.data.CoapDeviceTransportConfiguration;
|
||||
import org.thingsboard.server.common.data.device.data.PowerMode;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.transport.coap.TransportConfigurationContainer;
|
||||
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
@ -37,12 +51,22 @@ public class TbCoapClientState {
|
||||
private volatile TransportConfigurationContainer configuration;
|
||||
private volatile CoapTransportAdaptor adaptor;
|
||||
private volatile ValidateDeviceCredentialsResponse credentials;
|
||||
|
||||
private volatile TransportProtos.SessionInfoProto session;
|
||||
|
||||
private volatile DefaultCoapClientContext.CoapSessionListener listener;
|
||||
private volatile TbCoapObservationState attrs;
|
||||
private volatile TbCoapObservationState rpc;
|
||||
private TransportProtos.AttributeUpdateNotificationMsg missedAttributeUpdates;
|
||||
|
||||
private DeviceProfileId profileId;
|
||||
|
||||
@Getter
|
||||
private PowerMode powerMode;
|
||||
@Getter
|
||||
private Long psmActivityTimer;
|
||||
@Getter
|
||||
private Long edrxCycle;
|
||||
@Getter
|
||||
private Long pagingTransmissionWindow;
|
||||
@Getter
|
||||
@Setter
|
||||
private boolean asleep;
|
||||
@ -59,6 +83,14 @@ public class TbCoapClientState {
|
||||
this.lock = new ReentrantLock();
|
||||
}
|
||||
|
||||
public void init(ValidateDeviceCredentialsResponse credentials) {
|
||||
this.profileId = credentials.getDeviceInfo().getDeviceProfileId();
|
||||
this.powerMode = credentials.getDeviceInfo().getPowerMode();
|
||||
this.edrxCycle = credentials.getDeviceInfo().getEdrxCycle();
|
||||
this.psmActivityTimer = credentials.getDeviceInfo().getPsmActivityTimer();
|
||||
this.pagingTransmissionWindow = credentials.getDeviceInfo().getPagingTransmissionWindow();
|
||||
}
|
||||
|
||||
public void lock() {
|
||||
lock.lock();
|
||||
}
|
||||
@ -67,10 +99,54 @@ public class TbCoapClientState {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
public long updateLastUplinkTime(){
|
||||
public long updateLastUplinkTime() {
|
||||
this.lastUplinkTime = System.currentTimeMillis();
|
||||
this.firstEdrxDownlink = true;
|
||||
return lastUplinkTime;
|
||||
}
|
||||
|
||||
public boolean checkFirstDownlink() {
|
||||
boolean result = firstEdrxDownlink;
|
||||
firstEdrxDownlink = false;
|
||||
return result;
|
||||
}
|
||||
|
||||
public void onDeviceUpdate(Device device) {
|
||||
this.profileId = device.getDeviceProfileId();
|
||||
var data = device.getDeviceData();
|
||||
if (data.getTransportConfiguration() != null && data.getTransportConfiguration().getType().equals(DeviceTransportType.COAP)) {
|
||||
CoapDeviceTransportConfiguration configuration = (CoapDeviceTransportConfiguration) data.getTransportConfiguration();
|
||||
this.powerMode = configuration.getPowerMode();
|
||||
this.edrxCycle = configuration.getEdrxCycle();
|
||||
this.psmActivityTimer = configuration.getPsmActivityTimer();
|
||||
this.pagingTransmissionWindow = configuration.getPagingTransmissionWindow();
|
||||
}
|
||||
}
|
||||
|
||||
public void addQueuedNotification(TransportProtos.AttributeUpdateNotificationMsg msg) {
|
||||
if (missedAttributeUpdates == null) {
|
||||
missedAttributeUpdates = msg;
|
||||
} else {
|
||||
Map<String, TransportProtos.TsKvProto> updatedAttrs = new HashMap<>(missedAttributeUpdates.getSharedUpdatedCount() + msg.getSharedUpdatedCount());
|
||||
Set<String> deletedKeys = new HashSet<>(missedAttributeUpdates.getSharedDeletedCount() + msg.getSharedDeletedCount());
|
||||
for (TransportProtos.TsKvProto oldUpdatedAttrs : missedAttributeUpdates.getSharedUpdatedList()) {
|
||||
updatedAttrs.put(oldUpdatedAttrs.getKv().getKey(), oldUpdatedAttrs);
|
||||
}
|
||||
deletedKeys.addAll(msg.getSharedDeletedList());
|
||||
for (TransportProtos.TsKvProto newUpdatedAttrs : msg.getSharedUpdatedList()) {
|
||||
updatedAttrs.put(newUpdatedAttrs.getKv().getKey(), newUpdatedAttrs);
|
||||
}
|
||||
deletedKeys.addAll(msg.getSharedDeletedList());
|
||||
for (String deletedKey : msg.getSharedDeletedList()) {
|
||||
updatedAttrs.remove(deletedKey);
|
||||
}
|
||||
missedAttributeUpdates = TransportProtos.AttributeUpdateNotificationMsg.newBuilder().addAllSharedUpdated(updatedAttrs.values()).addAllSharedDeleted(deletedKeys).build();
|
||||
}
|
||||
}
|
||||
|
||||
public TransportProtos.AttributeUpdateNotificationMsg getAndClearMissedUpdates() {
|
||||
var result = this.missedAttributeUpdates;
|
||||
this.missedAttributeUpdates = null;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,4 +126,6 @@ public interface TransportService {
|
||||
SessionMetaData reportActivity(SessionInfoProto sessionInfo);
|
||||
|
||||
void deregisterSession(SessionInfoProto sessionInfo);
|
||||
|
||||
void log(SessionInfoProto sessionInfo, String msg);
|
||||
}
|
||||
|
||||
@ -36,4 +36,6 @@ public class TransportDeviceInfo implements Serializable {
|
||||
private PowerMode powerMode;
|
||||
private String additionalInfo;
|
||||
private Long edrxCycle;
|
||||
private Long psmActivityTimer;
|
||||
private Long pagingTransmissionWindow;
|
||||
}
|
||||
|
||||
@ -119,6 +119,12 @@ public class DefaultTransportService implements TransportService {
|
||||
|
||||
public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime";
|
||||
|
||||
private final AtomicInteger atomicTs = new AtomicInteger(0);
|
||||
|
||||
@Value("${transport.log.enabled:true}")
|
||||
private boolean logEnabled;
|
||||
@Value("${transport.log.max_length:1024}")
|
||||
private int logMaxLength;
|
||||
@Value("${transport.sessions.inactivity_timeout}")
|
||||
private long sessionInactivityTimeout;
|
||||
@Value("${transport.sessions.report_timeout}")
|
||||
@ -444,6 +450,8 @@ public class DefaultTransportService implements TransportService {
|
||||
if (StringUtils.isNotEmpty(di.getPowerMode())) {
|
||||
tdi.setPowerMode(PowerMode.valueOf(di.getPowerMode()));
|
||||
tdi.setEdrxCycle(di.getEdrxCycle());
|
||||
tdi.setPsmActivityTimer(di.getPsmActivityTimer());
|
||||
tdi.setPagingTransmissionWindow(di.getPagingTransmissionWindow());
|
||||
}
|
||||
return tdi;
|
||||
}
|
||||
@ -745,6 +753,26 @@ public class DefaultTransportService implements TransportService {
|
||||
sessions.remove(toSessionId(sessionInfo));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void log(TransportProtos.SessionInfoProto sessionInfo, String msg) {
|
||||
if (!logEnabled || sessionInfo == null || StringUtils.isEmpty(msg)) {
|
||||
return;
|
||||
}
|
||||
if (msg.length() > logMaxLength) {
|
||||
msg = msg.substring(0, logMaxLength);
|
||||
}
|
||||
TransportProtos.PostTelemetryMsg.Builder request = TransportProtos.PostTelemetryMsg.newBuilder();
|
||||
TransportProtos.TsKvListProto.Builder builder = TransportProtos.TsKvListProto.newBuilder();
|
||||
builder.setTs(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) * 1000L + (atomicTs.getAndIncrement() % 1000));
|
||||
builder.addKv(TransportProtos.KeyValueProto.newBuilder()
|
||||
.setKey("transportLog")
|
||||
.setType(TransportProtos.KeyValueType.STRING_V)
|
||||
.setStringV(msg).build());
|
||||
request.addTsKvList(builder.build());
|
||||
TransportProtos.PostTelemetryMsg postTelemetryMsg = request.build();
|
||||
process(sessionInfo, postTelemetryMsg, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
|
||||
private boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<?> callback) {
|
||||
return checkLimits(sessionInfo, msg, callback, 0);
|
||||
}
|
||||
|
||||
@ -89,6 +89,8 @@ transport:
|
||||
bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
|
||||
bind_port: "${COAP_BIND_PORT:5683}"
|
||||
timeout: "${COAP_TIMEOUT:10000}"
|
||||
psm_activity_timer: "${COAP_PSM_ACTIVITY_TIMER:10000}"
|
||||
paging_transmission_window: "${COAP_PAGING_TRANSMISSION_WINDOW:10000}"
|
||||
dtls:
|
||||
# Enable/disable DTLS 1.2 support
|
||||
enabled: "${COAP_DTLS_ENABLED:false}"
|
||||
@ -117,6 +119,9 @@ transport:
|
||||
type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
|
||||
# Maximum allowed string value length when processing Telemetry/Attributes JSON (0 value disables string value length check)
|
||||
max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}"
|
||||
log:
|
||||
enabled: "${TB_TRANSPORT_LOG_ENABLED:true}"
|
||||
max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}"
|
||||
|
||||
queue:
|
||||
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
||||
|
||||
@ -91,6 +91,9 @@ transport:
|
||||
type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
|
||||
# Maximum allowed string value length when processing Telemetry/Attributes JSON (0 value disables string value length check)
|
||||
max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}"
|
||||
log:
|
||||
enabled: "${TB_TRANSPORT_LOG_ENABLED:true}"
|
||||
max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}"
|
||||
|
||||
queue:
|
||||
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
||||
|
||||
@ -98,6 +98,9 @@ transport:
|
||||
# Enable/disable http/mqtt/coap transport protocols (has higher priority than certain protocol's 'enabled' property)
|
||||
api_enabled: "${TB_TRANSPORT_API_ENABLED:true}"
|
||||
# Local LwM2M transport parameters
|
||||
log:
|
||||
enabled: "${TB_TRANSPORT_LOG_ENABLED:true}"
|
||||
max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}"
|
||||
lwm2m:
|
||||
# Enable/disable lvm2m transport protocol.
|
||||
enabled: "${LWM2M_ENABLED:true}"
|
||||
|
||||
@ -123,6 +123,10 @@ transport:
|
||||
type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
|
||||
# Maximum allowed string value length when processing Telemetry/Attributes JSON (0 value disables string value length check)
|
||||
max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}"
|
||||
log:
|
||||
enabled: "${TB_TRANSPORT_LOG_ENABLED:true}"
|
||||
max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}"
|
||||
|
||||
|
||||
queue:
|
||||
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
||||
|
||||
@ -56,6 +56,9 @@ transport:
|
||||
type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
|
||||
# Maximum allowed string value length when processing Telemetry/Attributes JSON (0 value disables string value length check)
|
||||
max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}"
|
||||
log:
|
||||
enabled: "${TB_TRANSPORT_LOG_ENABLED:true}"
|
||||
max_length: "${TB_TRANSPORT_LOG_MAX_LENGTH:1024}"
|
||||
|
||||
queue:
|
||||
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user