commit
34dca26c28
@ -16,13 +16,16 @@
|
|||||||
package org.thingsboard.server.service.cf.ctx.state;
|
package org.thingsboard.server.service.cf.ctx.state;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.script.api.tbel.TbelCfArg;
|
import org.thingsboard.script.api.tbel.TbelCfArg;
|
||||||
import org.thingsboard.script.api.tbel.TbelCfSingleValueArg;
|
import org.thingsboard.script.api.tbel.TbelCfSingleValueArg;
|
||||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.BasicKvEntry;
|
import org.thingsboard.server.common.data.kv.BasicKvEntry;
|
||||||
|
import org.thingsboard.server.common.data.kv.JsonDataEntry;
|
||||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
import org.thingsboard.server.common.util.ProtoUtils;
|
import org.thingsboard.server.common.util.ProtoUtils;
|
||||||
@ -90,7 +93,15 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbelCfArg toTbelCfArg() {
|
public TbelCfArg toTbelCfArg() {
|
||||||
return new TbelCfSingleValueArg(ts, kvEntryValue.getValue());
|
Object value = kvEntryValue.getValue();
|
||||||
|
if (kvEntryValue instanceof JsonDataEntry) {
|
||||||
|
try {
|
||||||
|
value = JacksonUtil.readValue(kvEntryValue.getValueAsString(), new TypeReference<>() {
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new TbelCfSingleValueArg(ts, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -36,6 +36,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
|
|||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.TenantProfile;
|
import org.thingsboard.server.common.data.TenantProfile;
|
||||||
|
import org.thingsboard.server.common.data.exception.RateLimitExceededException;
|
||||||
import org.thingsboard.server.common.data.id.CustomerId;
|
import org.thingsboard.server.common.data.id.CustomerId;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||||
@ -52,7 +53,6 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
|
|||||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
import org.thingsboard.server.dao.util.TenantRateLimitException;
|
|
||||||
import org.thingsboard.server.exception.UnauthorizedException;
|
import org.thingsboard.server.exception.UnauthorizedException;
|
||||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||||
@ -742,7 +742,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable e) {
|
public void onFailure(Throwable e) {
|
||||||
if (e instanceof TenantRateLimitException || e.getCause() instanceof TenantRateLimitException) {
|
if (e instanceof RateLimitExceededException || e.getCause() instanceof RateLimitExceededException) {
|
||||||
log.trace("[{}] Tenant rate limit detected for subscription: [{}]:{}", sessionRef.getSecurityCtx().getTenantId(), entityId, cmd);
|
log.trace("[{}] Tenant rate limit detected for subscription: [{}]:{}", sessionRef.getSecurityCtx().getTenantId(), entityId, cmd);
|
||||||
} else {
|
} else {
|
||||||
log.info(FAILED_TO_FETCH_DATA, e);
|
log.info(FAILED_TO_FETCH_DATA, e);
|
||||||
|
|||||||
@ -17,8 +17,15 @@ package org.thingsboard.server.service.cf.ctx.state;
|
|||||||
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.thingsboard.script.api.tbel.TbelCfArg;
|
||||||
|
import org.thingsboard.script.api.tbel.TbelCfSingleValueArg;
|
||||||
|
import org.thingsboard.server.common.data.kv.JsonDataEntry;
|
||||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||||
|
|
||||||
@ -73,4 +80,34 @@ public class SingleValueArgumentEntryTest {
|
|||||||
void testUpdateEntryWhenValueWasNotChanged() {
|
void testUpdateEntryWhenValueWasNotChanged() {
|
||||||
assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts + 18, new LongDataEntry("key", 11L), 364L))).isTrue();
|
assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts + 18, new LongDataEntry("key", 11L), 364L))).isTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testToTbelCfArgWhenJsonIsObject() {
|
||||||
|
entry = new SingleValueArgumentEntry(ts, new JsonDataEntry("key", "{\"test\": 10}"), 370L);
|
||||||
|
TbelCfArg tbelCfArg = entry.toTbelCfArg();
|
||||||
|
assertThat(tbelCfArg).isNotNull();
|
||||||
|
assertThat(tbelCfArg).isInstanceOf(TbelCfSingleValueArg.class);
|
||||||
|
|
||||||
|
TbelCfSingleValueArg singleValueArg = (TbelCfSingleValueArg) tbelCfArg;
|
||||||
|
|
||||||
|
assertThat(singleValueArg.getValue()).isInstanceOf(Map.class);
|
||||||
|
Map<String, Integer> expectedMap = Map.of("test", 10);
|
||||||
|
assertThat(singleValueArg.getValue()).isEqualTo(expectedMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testToTbelCfArgWhenJsonIsArray() {
|
||||||
|
entry = new SingleValueArgumentEntry(ts, new JsonDataEntry("key", "[{\"test\": 10}, {\"test2\": 20}]"), 371L);
|
||||||
|
TbelCfArg tbelCfArg = entry.toTbelCfArg();
|
||||||
|
assertThat(tbelCfArg).isNotNull();
|
||||||
|
assertThat(tbelCfArg).isInstanceOf(TbelCfSingleValueArg.class);
|
||||||
|
|
||||||
|
TbelCfSingleValueArg singleValueArg = (TbelCfSingleValueArg) tbelCfArg;
|
||||||
|
|
||||||
|
assertThat(singleValueArg.getValue()).isInstanceOf(List.class);
|
||||||
|
List<Map<String, Integer>> expectedList = new ArrayList<>();
|
||||||
|
expectedList.add(Map.of("test", 10));
|
||||||
|
expectedList.add(Map.of("test2", 20));
|
||||||
|
assertThat(singleValueArg.getValue()).isEqualTo(expectedList);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@ -89,7 +89,7 @@ public class JobManagerTest extends AbstractControllerTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSubmitJob_allTasksSuccessful() {
|
public void testSubmitJob_allTasksSuccessful() {
|
||||||
int tasksCount = 5;
|
int tasksCount = 7;
|
||||||
JobId jobId = submitJob(DummyJobConfiguration.builder()
|
JobId jobId = submitJob(DummyJobConfiguration.builder()
|
||||||
.successfulTasksCount(tasksCount)
|
.successfulTasksCount(tasksCount)
|
||||||
.taskProcessingTimeMs(1000)
|
.taskProcessingTimeMs(1000)
|
||||||
@ -154,10 +154,10 @@ public class JobManagerTest extends AbstractControllerTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCancelJob_whileRunning() throws Exception {
|
public void testCancelJob_whileRunning() throws Exception {
|
||||||
int tasksCount = 100;
|
int tasksCount = 200;
|
||||||
JobId jobId = submitJob(DummyJobConfiguration.builder()
|
JobId jobId = submitJob(DummyJobConfiguration.builder()
|
||||||
.successfulTasksCount(tasksCount)
|
.successfulTasksCount(tasksCount)
|
||||||
.taskProcessingTimeMs(100)
|
.taskProcessingTimeMs(50)
|
||||||
.build()).getId();
|
.build()).getId();
|
||||||
|
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
|
|||||||
@ -20,7 +20,8 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
|||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
import org.apache.commons.lang3.BooleanUtils;
|
|
||||||
|
import static org.apache.commons.lang3.BooleanUtils.toBooleanDefaultIfNull;
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@ -34,14 +35,14 @@ public class EdqsState {
|
|||||||
private EdqsApiMode apiMode;
|
private EdqsApiMode apiMode;
|
||||||
|
|
||||||
public boolean updateEdqsReady(boolean ready) {
|
public boolean updateEdqsReady(boolean ready) {
|
||||||
boolean changed = BooleanUtils.toBooleanDefaultIfNull(this.edqsReady, false) != ready;
|
boolean changed = toBooleanDefaultIfNull(this.edqsReady, false) != ready;
|
||||||
this.edqsReady = ready;
|
this.edqsReady = ready;
|
||||||
return changed;
|
return changed;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
public boolean isApiReady() {
|
public boolean isApiReady() {
|
||||||
return edqsReady && syncStatus == EdqsSyncStatus.FINISHED;
|
return toBooleanDefaultIfNull(edqsReady, false) && syncStatus == EdqsSyncStatus.FINISHED;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.common.data.job.task;
|
package org.thingsboard.server.common.data.job.task;
|
||||||
|
|
||||||
|
import lombok.Builder;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
@ -25,22 +26,25 @@ import org.thingsboard.server.common.data.job.JobType;
|
|||||||
@Data
|
@Data
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@SuperBuilder
|
|
||||||
@ToString(callSuper = true)
|
@ToString(callSuper = true)
|
||||||
public class DummyTaskResult extends TaskResult {
|
public class DummyTaskResult extends TaskResult {
|
||||||
|
|
||||||
private DummyTaskFailure failure;
|
private DummyTaskFailure failure;
|
||||||
|
|
||||||
|
@Builder
|
||||||
|
private DummyTaskResult(boolean success, boolean discarded, DummyTaskFailure failure) {
|
||||||
|
super(success, discarded);
|
||||||
|
this.failure = failure;
|
||||||
|
}
|
||||||
|
|
||||||
public static DummyTaskResult success(DummyTask task) {
|
public static DummyTaskResult success(DummyTask task) {
|
||||||
return DummyTaskResult.builder()
|
return DummyTaskResult.builder()
|
||||||
.key(task.getKey())
|
|
||||||
.success(true)
|
.success(true)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static DummyTaskResult failed(DummyTask task, Throwable error) {
|
public static DummyTaskResult failed(DummyTask task, Throwable error) {
|
||||||
return DummyTaskResult.builder()
|
return DummyTaskResult.builder()
|
||||||
.key(task.getKey())
|
|
||||||
.failure(DummyTaskFailure.builder()
|
.failure(DummyTaskFailure.builder()
|
||||||
.error(error.getMessage())
|
.error(error.getMessage())
|
||||||
.number(task.getNumber())
|
.number(task.getNumber())
|
||||||
@ -51,7 +55,6 @@ public class DummyTaskResult extends TaskResult {
|
|||||||
|
|
||||||
public static DummyTaskResult discarded(DummyTask task) {
|
public static DummyTaskResult discarded(DummyTask task) {
|
||||||
return DummyTaskResult.builder()
|
return DummyTaskResult.builder()
|
||||||
.key(task.getKey())
|
|
||||||
.discarded(true)
|
.discarded(true)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,16 +20,12 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
|||||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.experimental.SuperBuilder;
|
|
||||||
import org.thingsboard.server.common.data.job.JobType;
|
import org.thingsboard.server.common.data.job.JobType;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@AllArgsConstructor
|
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@SuperBuilder
|
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType")
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "jobType")
|
||||||
@JsonSubTypes({
|
@JsonSubTypes({
|
||||||
@ -40,6 +36,12 @@ public abstract class TaskResult {
|
|||||||
private String key;
|
private String key;
|
||||||
private boolean success;
|
private boolean success;
|
||||||
private boolean discarded;
|
private boolean discarded;
|
||||||
|
private long finishTs;
|
||||||
|
|
||||||
|
protected TaskResult(boolean success, boolean discarded) {
|
||||||
|
this.success = success;
|
||||||
|
this.discarded = discarded;
|
||||||
|
}
|
||||||
|
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
public abstract JobType getJobType();
|
public abstract JobType getJobType();
|
||||||
|
|||||||
@ -43,6 +43,7 @@ public enum LimitedApi {
|
|||||||
RateLimitUtil.merge(
|
RateLimitUtil.merge(
|
||||||
DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantCoreRateLimits,
|
DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantCoreRateLimits,
|
||||||
DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantRuleEngineRateLimits), "Monolith telemetry Cassandra write queries", true),
|
DefaultTenantProfileConfiguration::getCassandraWriteQueryTenantRuleEngineRateLimits), "Monolith telemetry Cassandra write queries", true),
|
||||||
|
CASSANDRA_QUERIES(null, true), // left for backward compatibility with RateLimitsNotificationInfo
|
||||||
EDGE_EVENTS(DefaultTenantProfileConfiguration::getEdgeEventRateLimits, "Edge events", true),
|
EDGE_EVENTS(DefaultTenantProfileConfiguration::getEdgeEventRateLimits, "Edge events", true),
|
||||||
EDGE_EVENTS_PER_EDGE(DefaultTenantProfileConfiguration::getEdgeEventRateLimitsPerEdge, "Edge events per edge", false),
|
EDGE_EVENTS_PER_EDGE(DefaultTenantProfileConfiguration::getEdgeEventRateLimitsPerEdge, "Edge events per edge", false),
|
||||||
EDGE_UPLINK_MESSAGES(DefaultTenantProfileConfiguration::getEdgeUplinkMessagesRateLimits, "Edge uplink messages", true),
|
EDGE_UPLINK_MESSAGES(DefaultTenantProfileConfiguration::getEdgeUplinkMessagesRateLimits, "Edge uplink messages", true),
|
||||||
|
|||||||
@ -277,6 +277,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
|
|||||||
eventConsumer.awaitStop();
|
eventConsumer.awaitStop();
|
||||||
responseTemplate.stop();
|
responseTemplate.stop();
|
||||||
stateService.stop();
|
stateService.stop();
|
||||||
|
versionsStore.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -224,6 +224,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
|||||||
stateConsumer.awaitStop();
|
stateConsumer.awaitStop();
|
||||||
eventsToBackupConsumer.stop();
|
eventsToBackupConsumer.stop();
|
||||||
stateProducer.stop();
|
stateProducer.stop();
|
||||||
|
versionsStore.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,31 +15,35 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.edqs.util;
|
package org.thingsboard.server.edqs.util;
|
||||||
|
|
||||||
import com.github.benmanes.caffeine.cache.Cache;
|
|
||||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.server.common.data.edqs.EdqsObjectKey;
|
import org.thingsboard.server.common.data.edqs.EdqsObjectKey;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class VersionsStore {
|
public class VersionsStore {
|
||||||
|
|
||||||
private final Cache<EdqsObjectKey, Long> versions;
|
private final ConcurrentMap<EdqsObjectKey, TimedValue> versions = new ConcurrentHashMap<>();
|
||||||
|
private final long expirationMillis;
|
||||||
|
private final ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
|
||||||
public VersionsStore(int ttlMinutes) {
|
public VersionsStore(int ttlMinutes) {
|
||||||
this.versions = Caffeine.newBuilder()
|
this.expirationMillis = TimeUnit.MINUTES.toMillis(ttlMinutes);
|
||||||
.expireAfterWrite(ttlMinutes, TimeUnit.MINUTES)
|
startCleanupTask();
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isNew(EdqsObjectKey key, Long version) {
|
public boolean isNew(EdqsObjectKey key, Long version) {
|
||||||
AtomicBoolean isNew = new AtomicBoolean(false);
|
AtomicBoolean isNew = new AtomicBoolean(false);
|
||||||
versions.asMap().compute(key, (k, prevVersion) -> {
|
versions.compute(key, (k, prevVersion) -> {
|
||||||
if (prevVersion == null || prevVersion <= version) {
|
if (prevVersion == null || prevVersion.value <= version) {
|
||||||
isNew.set(true);
|
isNew.set(true);
|
||||||
return version;
|
return new TimedValue(version);
|
||||||
} else {
|
} else {
|
||||||
log.debug("[{}] Version {} is outdated, the latest is {}", key, version, prevVersion);
|
log.debug("[{}] Version {} is outdated, the latest is {}", key, version, prevVersion);
|
||||||
return prevVersion;
|
return prevVersion;
|
||||||
@ -48,4 +52,33 @@ public class VersionsStore {
|
|||||||
return isNew.get();
|
return isNew.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void startCleanupTask() {
|
||||||
|
cleaner.scheduleAtFixedRate(() -> {
|
||||||
|
try {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
for (Map.Entry<EdqsObjectKey, TimedValue> entry : versions.entrySet()) {
|
||||||
|
if (now - entry.getValue().lastUpdated > expirationMillis) {
|
||||||
|
versions.remove(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Cleanup task failed", e);
|
||||||
|
}
|
||||||
|
}, expirationMillis, expirationMillis, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() {
|
||||||
|
cleaner.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TimedValue {
|
||||||
|
private final long lastUpdated;
|
||||||
|
private final long value;
|
||||||
|
|
||||||
|
public TimedValue(long value) {
|
||||||
|
this.value = value;
|
||||||
|
this.lastUpdated = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -232,6 +232,8 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void reportTaskResult(T task, R result) {
|
private void reportTaskResult(T task, R result) {
|
||||||
|
result.setKey(task.getKey());
|
||||||
|
result.setFinishTs(System.currentTimeMillis());
|
||||||
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
|
statsService.reportTaskResult(task.getTenantId(), task.getJobId(), result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -69,7 +69,6 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
|
|||||||
job.setStatus(QUEUED);
|
job.setStatus(QUEUED);
|
||||||
} else {
|
} else {
|
||||||
job.setStatus(PENDING);
|
job.setStatus(PENDING);
|
||||||
job.getResult().setStartTs(System.currentTimeMillis());
|
|
||||||
}
|
}
|
||||||
return saveJob(tenantId, job, true, null);
|
return saveJob(tenantId, job, true, null);
|
||||||
}
|
}
|
||||||
@ -125,6 +124,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
|
|||||||
}
|
}
|
||||||
|
|
||||||
boolean publishEvent = false;
|
boolean publishEvent = false;
|
||||||
|
long lastFinishTs = 0;
|
||||||
for (TaskResult taskResult : jobStats.getTaskResults()) {
|
for (TaskResult taskResult : jobStats.getTaskResults()) {
|
||||||
if (!taskResult.getKey().equals(job.getConfiguration().getTasksKey())) {
|
if (!taskResult.getKey().equals(job.getConfiguration().getTasksKey())) {
|
||||||
log.debug("Ignoring task result {} with outdated key {}", taskResult, job.getConfiguration().getTasksKey());
|
log.debug("Ignoring task result {} with outdated key {}", taskResult, job.getConfiguration().getTasksKey());
|
||||||
@ -140,6 +140,9 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
|
|||||||
publishEvent = true;
|
publishEvent = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (taskResult.getFinishTs() > lastFinishTs) {
|
||||||
|
lastFinishTs = taskResult.getFinishTs();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (job.getStatus() == RUNNING) {
|
if (job.getStatus() == RUNNING) {
|
||||||
@ -153,7 +156,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
|
|||||||
job.setStatus(COMPLETED);
|
job.setStatus(COMPLETED);
|
||||||
publishEvent = true;
|
publishEvent = true;
|
||||||
}
|
}
|
||||||
result.setFinishTs(System.currentTimeMillis());
|
result.setFinishTs(lastFinishTs);
|
||||||
job.getConfiguration().setToReprocess(null);
|
job.getConfiguration().setToReprocess(null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -166,6 +169,9 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
|
|||||||
if (!Job.SUPPORTED_ENTITY_TYPES.contains(job.getEntityId().getEntityType())) {
|
if (!Job.SUPPORTED_ENTITY_TYPES.contains(job.getEntityId().getEntityType())) {
|
||||||
throw new IllegalArgumentException("Unsupported entity type " + job.getEntityId().getEntityType());
|
throw new IllegalArgumentException("Unsupported entity type " + job.getEntityId().getEntityType());
|
||||||
}
|
}
|
||||||
|
if (job.getStatus() == PENDING) {
|
||||||
|
job.getResult().setStartTs(System.currentTimeMillis());
|
||||||
|
}
|
||||||
|
|
||||||
job = jobDao.save(tenantId, job);
|
job = jobDao.save(tenantId, job);
|
||||||
if (publishEvent) {
|
if (publishEvent) {
|
||||||
|
|||||||
@ -33,7 +33,6 @@ import org.thingsboard.server.common.data.notification.rule.DefaultNotificationR
|
|||||||
import org.thingsboard.server.common.data.notification.rule.EscalatedNotificationRuleRecipientsConfig;
|
import org.thingsboard.server.common.data.notification.rule.EscalatedNotificationRuleRecipientsConfig;
|
||||||
import org.thingsboard.server.common.data.notification.rule.NotificationRule;
|
import org.thingsboard.server.common.data.notification.rule.NotificationRule;
|
||||||
import org.thingsboard.server.common.data.notification.rule.NotificationRuleConfig;
|
import org.thingsboard.server.common.data.notification.rule.NotificationRuleConfig;
|
||||||
import org.thingsboard.server.common.data.notification.rule.trigger.ResourcesShortageTrigger.Resource;
|
|
||||||
import org.thingsboard.server.common.data.notification.rule.trigger.config.AlarmAssignmentNotificationRuleTriggerConfig;
|
import org.thingsboard.server.common.data.notification.rule.trigger.config.AlarmAssignmentNotificationRuleTriggerConfig;
|
||||||
import org.thingsboard.server.common.data.notification.rule.trigger.config.AlarmCommentNotificationRuleTriggerConfig;
|
import org.thingsboard.server.common.data.notification.rule.trigger.config.AlarmCommentNotificationRuleTriggerConfig;
|
||||||
import org.thingsboard.server.common.data.notification.rule.trigger.config.AlarmNotificationRuleTriggerConfig;
|
import org.thingsboard.server.common.data.notification.rule.trigger.config.AlarmNotificationRuleTriggerConfig;
|
||||||
|
|||||||
@ -205,8 +205,8 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
ListenableFuture<Integer> dpsFuture = saveTs ? Futures.transform(Futures.allAsList(tsFutures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()) : Futures.immediateFuture(0);
|
ListenableFuture<Integer> dpsFuture = saveTs ? Futures.transform(Futures.allAsList(tsFutures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()) : Futures.immediateFuture(0);
|
||||||
ListenableFuture<List<Long>> versionsFuture = saveLatest ? Futures.allAsList(latestFutures) : Futures.immediateFuture(null);
|
ListenableFuture<List<Long>> versionsFuture = saveLatest ? Futures.allAsList(latestFutures) : Futures.immediateFuture(null);
|
||||||
return Futures.whenAllComplete(dpsFuture, versionsFuture).call(() -> {
|
return Futures.whenAllComplete(dpsFuture, versionsFuture).call(() -> {
|
||||||
Integer dataPoints = Futures.getUnchecked(dpsFuture);
|
Integer dataPoints = dpsFuture.get();
|
||||||
List<Long> versions = Futures.getUnchecked(versionsFuture);
|
List<Long> versions = versionsFuture.get();
|
||||||
return TimeseriesSaveResult.of(dataPoints, versions);
|
return TimeseriesSaveResult.of(dataPoints, versions);
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
@ -298,13 +298,13 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
long interval = query.getInterval();
|
long interval = query.getInterval();
|
||||||
if (interval < 1) {
|
if (interval < 1) {
|
||||||
throw new IncorrectParameterException("Invalid TsKvQuery: 'interval' must be greater than 0, but got " + interval +
|
throw new IncorrectParameterException("Invalid TsKvQuery: 'interval' must be greater than 0, but got " + interval +
|
||||||
". Please check your query parameters and ensure 'endTs' is greater than 'startTs' or increase 'interval'.");
|
". Please check your query parameters and ensure 'endTs' is greater than 'startTs' or increase 'interval'.");
|
||||||
}
|
}
|
||||||
long step = Math.max(interval, 1000);
|
long step = Math.max(interval, 1000);
|
||||||
long intervalCounts = (query.getEndTs() - query.getStartTs()) / step;
|
long intervalCounts = (query.getEndTs() - query.getStartTs()) / step;
|
||||||
if (intervalCounts > maxTsIntervals || intervalCounts < 0) {
|
if (intervalCounts > maxTsIntervals || intervalCounts < 0) {
|
||||||
throw new IncorrectParameterException("Incorrect TsKvQuery. Number of intervals is to high - " + intervalCounts + ". " +
|
throw new IncorrectParameterException("Incorrect TsKvQuery. Number of intervals is to high - " + intervalCounts + ". " +
|
||||||
"Please increase 'interval' parameter for your query or reduce the time range of the query.");
|
"Please increase 'interval' parameter for your query or reduce the time range of the query.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
import org.thingsboard.server.cache.limits.RateLimitService;
|
import org.thingsboard.server.cache.limits.RateLimitService;
|
||||||
|
import org.thingsboard.server.common.data.exception.RateLimitExceededException;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.limit.LimitedApi;
|
import org.thingsboard.server.common.data.limit.LimitedApi;
|
||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
@ -66,7 +67,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
|
|
||||||
private final long maxWaitTime;
|
private final long maxWaitTime;
|
||||||
private final long pollMs;
|
private final long pollMs;
|
||||||
private final String bufferName;
|
private final String bufferName;
|
||||||
private final BlockingQueue<AsyncTaskContext<T, V>> queue;
|
private final BlockingQueue<AsyncTaskContext<T, V>> queue;
|
||||||
private final ExecutorService dispatcherExecutor;
|
private final ExecutorService dispatcherExecutor;
|
||||||
private final ExecutorService callbackExecutor;
|
private final ExecutorService callbackExecutor;
|
||||||
@ -124,7 +125,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
if (!rateLimitService.checkRateLimit(myLimitedApi, tenantId, tenantId, true)) {
|
if (!rateLimitService.checkRateLimit(myLimitedApi, tenantId, tenantId, true)) {
|
||||||
stats.incrementRateLimitedTenant(tenantId);
|
stats.incrementRateLimitedTenant(tenantId);
|
||||||
stats.getTotalRateLimited().increment();
|
stats.getTotalRateLimited().increment();
|
||||||
settableFuture.setException(new TenantRateLimitException());
|
settableFuture.setException(new RateLimitExceededException(myLimitedApi));
|
||||||
perTenantLimitReached = true;
|
perTenantLimitReached = true;
|
||||||
}
|
}
|
||||||
} else if (tenantId == null) {
|
} else if (tenantId == null) {
|
||||||
@ -299,9 +300,9 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
|||||||
.count();
|
.count();
|
||||||
|
|
||||||
if (queueSize > 0
|
if (queueSize > 0
|
||||||
|| rateLimitedTenantsCount > 0
|
|| rateLimitedTenantsCount > 0
|
||||||
|| concurrencyLevel.get() > 0
|
|| concurrencyLevel.get() > 0
|
||||||
|| stats.getStatsCounters().stream().anyMatch(counter -> counter.get() > 0)
|
|| stats.getStatsCounters().stream().anyMatch(counter -> counter.get() > 0)
|
||||||
) {
|
) {
|
||||||
StringBuilder statsBuilder = new StringBuilder();
|
StringBuilder statsBuilder = new StringBuilder();
|
||||||
|
|
||||||
|
|||||||
@ -1,19 +0,0 @@
|
|||||||
/**
|
|
||||||
* Copyright © 2016-2025 The Thingsboard Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.thingsboard.server.dao.util;
|
|
||||||
|
|
||||||
public class TenantRateLimitException extends Exception {
|
|
||||||
}
|
|
||||||
@ -556,22 +556,6 @@ public class MqttClientTest extends AbstractContainerTest {
|
|||||||
assertThat(provisionResponse.get("status").asText()).isEqualTo("NOT_FOUND");
|
assertThat(provisionResponse.get("status").asText()).isEqualTo("NOT_FOUND");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void regularDisconnect() throws Exception {
|
|
||||||
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
|
|
||||||
|
|
||||||
MqttMessageListener listener = new MqttMessageListener();
|
|
||||||
MqttClient mqttClient = getMqttClient(deviceCredentials, listener, MqttVersion.MQTT_5);
|
|
||||||
final List<Byte> returnCodeByteValue = new ArrayList<>();
|
|
||||||
MqttClientCallback callbackForDisconnectWithReturnCode = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValue);
|
|
||||||
mqttClient.setCallback(callbackForDisconnectWithReturnCode);
|
|
||||||
mqttClient.disconnect();
|
|
||||||
Thread.sleep(1000);
|
|
||||||
assertThat(returnCodeByteValue.size()).isEqualTo(1);
|
|
||||||
MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.valueOf(returnCodeByteValue.get(0));
|
|
||||||
assertThat(returnCode).isEqualTo(MqttReasonCodes.Disconnect.NORMAL_DISCONNECT);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void clientSessionTakenOverDisconnect() throws Exception {
|
public void clientSessionTakenOverDisconnect() throws Exception {
|
||||||
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
|
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
|
||||||
|
|||||||
@ -96,6 +96,8 @@ final class MqttClientImpl implements MqttClient {
|
|||||||
|
|
||||||
private final ListeningExecutor handlerExecutor;
|
private final ListeningExecutor handlerExecutor;
|
||||||
|
|
||||||
|
private final static int DISCONNECT_FALLBACK_DELAY_SECS = 1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct the MqttClientImpl with default config
|
* Construct the MqttClientImpl with default config
|
||||||
*/
|
*/
|
||||||
@ -456,16 +458,25 @@ final class MqttClientImpl implements MqttClient {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void disconnect() {
|
public void disconnect() {
|
||||||
|
if (disconnected) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
log.trace("[{}] Disconnecting from server", channel != null ? channel.id() : "UNKNOWN");
|
log.trace("[{}] Disconnecting from server", channel != null ? channel.id() : "UNKNOWN");
|
||||||
disconnected = true;
|
|
||||||
if (this.channel != null) {
|
if (this.channel != null) {
|
||||||
MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0));
|
MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0));
|
||||||
ChannelFuture channelFuture = this.sendAndFlushPacket(message);
|
|
||||||
|
sendAndFlushPacket(message).addListener((ChannelFutureListener) future -> {
|
||||||
|
future.channel().close();
|
||||||
|
disconnected = true;
|
||||||
|
});
|
||||||
eventLoop.schedule(() -> {
|
eventLoop.schedule(() -> {
|
||||||
if (!channelFuture.isDone()) {
|
if (channel.isOpen()) {
|
||||||
|
log.trace("[{}] Channel still open after {} second; forcing close now", channel.id(), DISCONNECT_FALLBACK_DELAY_SECS);
|
||||||
this.channel.close();
|
this.channel.close();
|
||||||
|
disconnected = true;
|
||||||
}
|
}
|
||||||
}, 500, TimeUnit.MILLISECONDS);
|
}, DISCONNECT_FALLBACK_DELAY_SECS, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -119,6 +119,26 @@ class MqttClientTest {
|
|||||||
assertThat(client.isConnected()).isTrue();
|
assertThat(client.isConnected()).isTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testDisconnectFromBroker() {
|
||||||
|
// GIVEN
|
||||||
|
var clientConfig = new MqttClientConfig();
|
||||||
|
clientConfig.setOwnerId("Test[Disconnect]");
|
||||||
|
clientConfig.setClientId("disconnect");
|
||||||
|
|
||||||
|
client = MqttClient.create(clientConfig, null, handlerExecutor);
|
||||||
|
|
||||||
|
connect(broker.getHost(), broker.getMqttPort());
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
client.disconnect();
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
Awaitility.await("waiting for client to disconnect")
|
||||||
|
.atMost(Duration.ofSeconds(5))
|
||||||
|
.untilAsserted(() -> assertThat(client.isConnected()).isFalse());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testDisconnectDueToKeepAliveIfNoActivity() {
|
void testDisconnectDueToKeepAliveIfNoActivity() {
|
||||||
// GIVEN
|
// GIVEN
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user