Merge branch 'master' of https://github.com/thingsboard/thingsboard into UITests_AssetsProfiles
This commit is contained in:
commit
04da9bac51
@ -104,6 +104,7 @@ import org.thingsboard.server.service.script.RuleNodeTbelScriptEngine;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@ -788,6 +789,12 @@ class DefaultTbContext implements TbContext {
|
||||
return metaData;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void schedule(Runnable runnable, long delay, TimeUnit timeUnit) {
|
||||
mainCtx.getScheduler().schedule(runnable, delay, timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkTenantEntity(EntityId entityId) {
|
||||
if (!this.getTenantId().equals(TenantIdLoader.findTenantId(this, entityId))) {
|
||||
|
||||
@ -69,6 +69,7 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@ -201,6 +202,8 @@ public interface TbContext {
|
||||
*
|
||||
*/
|
||||
|
||||
void schedule(Runnable runnable, long delay, TimeUnit timeUnit);
|
||||
|
||||
void checkTenantEntity(EntityId entityId);
|
||||
|
||||
boolean isLocalEntity(EntityId entityId);
|
||||
|
||||
@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.deduplication;
|
||||
|
||||
public enum DeduplicationId {
|
||||
|
||||
ORIGINATOR, TENANT, CUSTOMER
|
||||
|
||||
}
|
||||
@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.deduplication;
|
||||
|
||||
public enum DeduplicationStrategy {
|
||||
|
||||
FIRST, LAST, ALL
|
||||
|
||||
}
|
||||
@ -0,0 +1,237 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.deduplication;
|
||||
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.util.Pair;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.rule.engine.api.RuleNode;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.rule.engine.api.TbRelationTypes;
|
||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.common.data.util.TbPair;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@RuleNode(
|
||||
type = ComponentType.ACTION,
|
||||
name = "deduplication",
|
||||
configClazz = TbMsgDeduplicationNodeConfiguration.class,
|
||||
nodeDescription = "Deduplicate messages for a configurable period based on a specified deduplication strategy.",
|
||||
nodeDetails = "Rule node allows you to select one of the following strategy to deduplicate messages: <br></br>" +
|
||||
"<b>FIRST</b> - return first message that arrived during deduplication period.<br></br>" +
|
||||
"<b>LAST</b> - return last message that arrived during deduplication period.<br></br>" +
|
||||
"<b>ALL</b> - return all messages as a single JSON array message. Where each element represents object with <b>msg</b> and <b>metadata</b> inner properties.<br></br>",
|
||||
icon = "content_copy",
|
||||
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
||||
configDirective = "tbActionNodeMsgDeduplicationConfig"
|
||||
)
|
||||
@Slf4j
|
||||
public class TbMsgDeduplicationNode implements TbNode {
|
||||
|
||||
private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeduplicationNodeMsg";
|
||||
private static final int TB_MSG_DEDUPLICATION_TIMEOUT = 5000;
|
||||
public static final int TB_MSG_DEDUPLICATION_RETRY_DELAY = 10;
|
||||
|
||||
private TbMsgDeduplicationNodeConfiguration config;
|
||||
|
||||
private final Map<EntityId, List<TbMsg>> deduplicationMap;
|
||||
private long deduplicationInterval;
|
||||
private long lastScheduledTs;
|
||||
private DeduplicationId deduplicationId;
|
||||
|
||||
public TbMsgDeduplicationNode() {
|
||||
this.deduplicationMap = new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
this.config = TbNodeUtils.convert(configuration, TbMsgDeduplicationNodeConfiguration.class);
|
||||
this.deduplicationInterval = TimeUnit.SECONDS.toMillis(config.getInterval());
|
||||
this.deduplicationId = config.getId();
|
||||
scheduleTickMsg(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
|
||||
if (TB_MSG_DEDUPLICATION_TIMEOUT_MSG.equals(msg.getType())) {
|
||||
try {
|
||||
processDeduplication(ctx);
|
||||
} finally {
|
||||
scheduleTickMsg(ctx);
|
||||
}
|
||||
} else {
|
||||
processOnRegularMsg(ctx, msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
deduplicationMap.clear();
|
||||
}
|
||||
|
||||
private void processOnRegularMsg(TbContext ctx, TbMsg msg) {
|
||||
EntityId id = getDeduplicationId(ctx, msg);
|
||||
List<TbMsg> deduplicationMsgs = deduplicationMap.computeIfAbsent(id, k -> new LinkedList<>());
|
||||
if (deduplicationMsgs.size() < config.getMaxPendingMsgs()) {
|
||||
log.trace("[{}][{}] Adding msg: [{}][{}] to the pending msgs map ...", ctx.getSelfId(), id, msg.getId(), msg.getMetaDataTs());
|
||||
deduplicationMsgs.add(msg);
|
||||
ctx.ack(msg);
|
||||
} else {
|
||||
log.trace("[{}] Max limit of pending messages reached for deduplication id: [{}]", ctx.getSelfId(), id);
|
||||
ctx.tellFailure(msg, new RuntimeException("[" + ctx.getSelfId() + "] Max limit of pending messages reached for deduplication id: [" + id + "]"));
|
||||
}
|
||||
}
|
||||
|
||||
private EntityId getDeduplicationId(TbContext ctx, TbMsg msg) {
|
||||
switch (deduplicationId) {
|
||||
case ORIGINATOR:
|
||||
return msg.getOriginator();
|
||||
case TENANT:
|
||||
return ctx.getTenantId();
|
||||
case CUSTOMER:
|
||||
return msg.getCustomerId();
|
||||
default:
|
||||
throw new IllegalStateException("Unsupported deduplication id: " + deduplicationId);
|
||||
}
|
||||
}
|
||||
|
||||
private void processDeduplication(TbContext ctx) {
|
||||
if (deduplicationMap.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<TbMsg> deduplicationResults = new ArrayList<>();
|
||||
long deduplicationTimeoutMs = System.currentTimeMillis();
|
||||
deduplicationMap.forEach((entityId, tbMsgs) -> {
|
||||
if (tbMsgs.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
Optional<TbPair<Long, Long>> packBoundsOpt = findValidPack(tbMsgs, deduplicationTimeoutMs);
|
||||
while (packBoundsOpt.isPresent()) {
|
||||
TbPair<Long, Long> packBounds = packBoundsOpt.get();
|
||||
if (DeduplicationStrategy.ALL.equals(config.getStrategy())) {
|
||||
List<TbMsg> pack = new ArrayList<>();
|
||||
for (Iterator<TbMsg> iterator = tbMsgs.iterator(); iterator.hasNext(); ) {
|
||||
TbMsg msg = iterator.next();
|
||||
long msgTs = msg.getMetaDataTs();
|
||||
if (msgTs >= packBounds.getFirst() && msgTs < packBounds.getSecond()) {
|
||||
pack.add(msg);
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
deduplicationResults.add(TbMsg.newMsg(
|
||||
config.getQueueName(),
|
||||
config.getOutMsgType(),
|
||||
entityId,
|
||||
getMetadata(),
|
||||
getMergedData(pack)));
|
||||
} else {
|
||||
TbMsg resultMsg = null;
|
||||
boolean searchMin = DeduplicationStrategy.FIRST.equals(config.getStrategy());
|
||||
for (Iterator<TbMsg> iterator = tbMsgs.iterator(); iterator.hasNext(); ) {
|
||||
TbMsg msg = iterator.next();
|
||||
long msgTs = msg.getMetaDataTs();
|
||||
if (msgTs >= packBounds.getFirst() && msgTs < packBounds.getSecond()) {
|
||||
iterator.remove();
|
||||
if (resultMsg == null
|
||||
|| (searchMin && msg.getMetaDataTs() < resultMsg.getMetaDataTs())
|
||||
|| (!searchMin && msg.getMetaDataTs() > resultMsg.getMetaDataTs())) {
|
||||
resultMsg = msg;
|
||||
}
|
||||
}
|
||||
}
|
||||
deduplicationResults.add(resultMsg);
|
||||
}
|
||||
packBoundsOpt = findValidPack(tbMsgs, deduplicationTimeoutMs);
|
||||
}
|
||||
});
|
||||
deduplicationResults.forEach(outMsg -> enqueueForTellNextWithRetry(ctx, outMsg, 0));
|
||||
}
|
||||
|
||||
private Optional<TbPair<Long, Long>> findValidPack(List<TbMsg> msgs, long deduplicationTimeoutMs) {
|
||||
Optional<TbMsg> min = msgs.stream().min(Comparator.comparing(TbMsg::getMetaDataTs));
|
||||
return min.map(minTsMsg -> {
|
||||
long packStartTs = minTsMsg.getMetaDataTs();
|
||||
long packEndTs = packStartTs + deduplicationInterval;
|
||||
if (packEndTs <= deduplicationTimeoutMs) {
|
||||
return new TbPair<>(packStartTs, packEndTs);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private void enqueueForTellNextWithRetry(TbContext ctx, TbMsg msg, int retryAttempt) {
|
||||
if (config.getMaxRetries() > retryAttempt) {
|
||||
ctx.enqueueForTellNext(msg, TbRelationTypes.SUCCESS,
|
||||
() -> {
|
||||
log.trace("[{}][{}][{}] Successfully enqueue deduplication result message!", ctx.getSelfId(), msg.getOriginator(), retryAttempt);
|
||||
},
|
||||
throwable -> {
|
||||
log.trace("[{}][{}][{}] Failed to enqueue deduplication output message due to: ", ctx.getSelfId(), msg.getOriginator(), retryAttempt, throwable);
|
||||
ctx.schedule(() -> {
|
||||
enqueueForTellNextWithRetry(ctx, msg, retryAttempt + 1);
|
||||
}, TB_MSG_DEDUPLICATION_RETRY_DELAY, TimeUnit.SECONDS);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleTickMsg(TbContext ctx) {
|
||||
long curTs = System.currentTimeMillis();
|
||||
if (lastScheduledTs == 0L) {
|
||||
lastScheduledTs = curTs;
|
||||
}
|
||||
lastScheduledTs += TB_MSG_DEDUPLICATION_TIMEOUT;
|
||||
long curDelay = Math.max(0L, (lastScheduledTs - curTs));
|
||||
TbMsg tickMsg = ctx.newMsg(null, TB_MSG_DEDUPLICATION_TIMEOUT_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
|
||||
ctx.tellSelf(tickMsg, curDelay);
|
||||
}
|
||||
|
||||
private String getMergedData(List<TbMsg> msgs) {
|
||||
ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode();
|
||||
msgs.forEach(msg -> {
|
||||
ObjectNode msgNode = JacksonUtil.newObjectNode();
|
||||
msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData()));
|
||||
msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData()));
|
||||
mergedData.add(msgNode);
|
||||
});
|
||||
return JacksonUtil.toString(mergedData);
|
||||
}
|
||||
|
||||
private TbMsgMetaData getMetadata() {
|
||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||
metaData.putValue("ts", String.valueOf(System.currentTimeMillis()));
|
||||
return metaData;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,46 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.deduplication;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
||||
|
||||
@Data
|
||||
public class TbMsgDeduplicationNodeConfiguration implements NodeConfiguration<TbMsgDeduplicationNodeConfiguration> {
|
||||
|
||||
private int interval;
|
||||
private DeduplicationId id;
|
||||
private DeduplicationStrategy strategy;
|
||||
|
||||
// Advanced settings:
|
||||
private int maxPendingMsgs;
|
||||
private int maxRetries;
|
||||
|
||||
// only for DeduplicationStrategy.ALL:
|
||||
private String outMsgType;
|
||||
private String queueName;
|
||||
|
||||
@Override
|
||||
public TbMsgDeduplicationNodeConfiguration defaultConfiguration() {
|
||||
TbMsgDeduplicationNodeConfiguration configuration = new TbMsgDeduplicationNodeConfiguration();
|
||||
configuration.setInterval(60);
|
||||
configuration.setId(DeduplicationId.ORIGINATOR);
|
||||
configuration.setStrategy(DeduplicationStrategy.FIRST);
|
||||
configuration.setMaxPendingMsgs(100);
|
||||
configuration.setMaxRetries(3);
|
||||
return configuration;
|
||||
}
|
||||
}
|
||||
@ -48,7 +48,6 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
||||
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
||||
configDirective = "tbActionNodeMsgDelayConfig"
|
||||
)
|
||||
|
||||
public class TbMsgDelayNode implements TbNode {
|
||||
|
||||
private static final String TB_MSG_DELAY_NODE_MSG = "TbMsgDelayNodeMsg";
|
||||
|
||||
File diff suppressed because one or more lines are too long
@ -0,0 +1,402 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.action;
|
||||
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.rule.engine.api.TbRelationTypes;
|
||||
import org.thingsboard.rule.engine.deduplication.DeduplicationStrategy;
|
||||
import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNode;
|
||||
import org.thingsboard.rule.engine.deduplication.TbMsgDeduplicationNodeConfiguration;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.isNull;
|
||||
import static org.mockito.ArgumentMatchers.nullable;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@Slf4j
|
||||
public class TbMsgDeduplicationNodeTest {
|
||||
|
||||
private static final String MAIN_QUEUE_NAME = "Main";
|
||||
private static final String HIGH_PRIORITY_QUEUE_NAME = "HighPriority";
|
||||
private static final String TB_MSG_DEDUPLICATION_TIMEOUT_MSG = "TbMsgDeduplicationNodeMsg";
|
||||
|
||||
private TbContext ctx;
|
||||
|
||||
private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("de-duplication-node-test");
|
||||
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(factory);
|
||||
private final int deduplicationInterval = 1;
|
||||
|
||||
private TenantId tenantId;
|
||||
|
||||
private TbMsgDeduplicationNode node;
|
||||
private TbMsgDeduplicationNodeConfiguration config;
|
||||
private TbNodeConfiguration nodeConfiguration;
|
||||
|
||||
private CountDownLatch awaitTellSelfLatch;
|
||||
|
||||
@BeforeEach
|
||||
public void init() throws TbNodeException {
|
||||
ctx = mock(TbContext.class);
|
||||
|
||||
tenantId = TenantId.fromUUID(UUID.randomUUID());
|
||||
RuleNodeId ruleNodeId = new RuleNodeId(UUID.randomUUID());
|
||||
|
||||
when(ctx.getSelfId()).thenReturn(ruleNodeId);
|
||||
when(ctx.getTenantId()).thenReturn(tenantId);
|
||||
|
||||
doAnswer((Answer<TbMsg>) invocationOnMock -> {
|
||||
String type = (String) (invocationOnMock.getArguments())[1];
|
||||
EntityId originator = (EntityId) (invocationOnMock.getArguments())[2];
|
||||
TbMsgMetaData metaData = (TbMsgMetaData) (invocationOnMock.getArguments())[3];
|
||||
String data = (String) (invocationOnMock.getArguments())[4];
|
||||
return TbMsg.newMsg(type, originator, metaData.copy(), data);
|
||||
}).when(ctx).newMsg(isNull(), eq(TB_MSG_DEDUPLICATION_TIMEOUT_MSG), nullable(EntityId.class), any(TbMsgMetaData.class), any(String.class));
|
||||
node = spy(new TbMsgDeduplicationNode());
|
||||
config = new TbMsgDeduplicationNodeConfiguration().defaultConfiguration();
|
||||
}
|
||||
|
||||
private void invokeTellSelf(int maxNumberOfInvocation) {
|
||||
invokeTellSelf(maxNumberOfInvocation, false, 0);
|
||||
}
|
||||
|
||||
private void invokeTellSelf(int maxNumberOfInvocation, boolean delayScheduleTimeout, int delayMultiplier) {
|
||||
AtomicLong scheduleTimeout = new AtomicLong(deduplicationInterval);
|
||||
AtomicInteger scheduleCount = new AtomicInteger(0);
|
||||
doAnswer((Answer<Void>) invocationOnMock -> {
|
||||
scheduleCount.getAndIncrement();
|
||||
if (scheduleCount.get() <= maxNumberOfInvocation) {
|
||||
TbMsg msg = (TbMsg) (invocationOnMock.getArguments())[0];
|
||||
executorService.schedule(() -> {
|
||||
try {
|
||||
node.onMsg(ctx, msg);
|
||||
awaitTellSelfLatch.countDown();
|
||||
} catch (ExecutionException | InterruptedException | TbNodeException e) {
|
||||
log.error("Failed to execute tellSelf method call due to: ", e);
|
||||
}
|
||||
}, scheduleTimeout.get(), TimeUnit.SECONDS);
|
||||
if (delayScheduleTimeout) {
|
||||
scheduleTimeout.set(scheduleTimeout.get() * delayMultiplier);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}).when(ctx).tellSelf(ArgumentMatchers.any(TbMsg.class), ArgumentMatchers.anyLong());
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void destroy() {
|
||||
executorService.shutdown();
|
||||
node.destroy();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void given_100_messages_strategy_first_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException {
|
||||
int wantedNumberOfTellSelfInvocation = 2;
|
||||
int msgCount = 100;
|
||||
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
|
||||
invokeTellSelf(wantedNumberOfTellSelfInvocation);
|
||||
|
||||
config.setInterval(deduplicationInterval);
|
||||
config.setMaxPendingMsgs(msgCount);
|
||||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
||||
node.init(ctx, nodeConfiguration);
|
||||
|
||||
DeviceId deviceId = new DeviceId(UUID.randomUUID());
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
|
||||
List<TbMsg> inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500);
|
||||
for (TbMsg msg : inputMsgs) {
|
||||
node.onMsg(ctx, msg);
|
||||
}
|
||||
|
||||
TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(inputMsgs.size() - 1).getMetaDataTs() + 2);
|
||||
node.onMsg(ctx, msgToReject);
|
||||
|
||||
awaitTellSelfLatch.await();
|
||||
|
||||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
|
||||
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
|
||||
|
||||
verify(ctx, times(msgCount)).ack(any());
|
||||
verify(ctx, times(1)).tellFailure(eq(msgToReject), any());
|
||||
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any());
|
||||
verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
|
||||
Assertions.assertEquals(inputMsgs.get(0), newMsgCaptor.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void given_100_messages_strategy_last_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException {
|
||||
int wantedNumberOfTellSelfInvocation = 2;
|
||||
int msgCount = 100;
|
||||
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
|
||||
invokeTellSelf(wantedNumberOfTellSelfInvocation);
|
||||
|
||||
config.setStrategy(DeduplicationStrategy.LAST);
|
||||
config.setInterval(deduplicationInterval);
|
||||
config.setMaxPendingMsgs(msgCount);
|
||||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
||||
node.init(ctx, nodeConfiguration);
|
||||
|
||||
DeviceId deviceId = new DeviceId(UUID.randomUUID());
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
|
||||
List<TbMsg> inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500);
|
||||
TbMsg msgWithLatestTs = getMsgWithLatestTs(inputMsgs);
|
||||
|
||||
for (TbMsg msg : inputMsgs) {
|
||||
node.onMsg(ctx, msg);
|
||||
}
|
||||
|
||||
TbMsg msgToReject = createMsg(deviceId, inputMsgs.get(inputMsgs.size() - 1).getMetaDataTs() + 2);
|
||||
node.onMsg(ctx, msgToReject);
|
||||
|
||||
awaitTellSelfLatch.await();
|
||||
|
||||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
|
||||
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
|
||||
|
||||
verify(ctx, times(msgCount)).ack(any());
|
||||
verify(ctx, times(1)).tellFailure(eq(msgToReject), any());
|
||||
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation + 1)).onMsg(eq(ctx), any());
|
||||
verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
|
||||
Assertions.assertEquals(msgWithLatestTs, newMsgCaptor.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void given_100_messages_strategy_all_then_verifyOutput() throws TbNodeException, ExecutionException, InterruptedException {
|
||||
int wantedNumberOfTellSelfInvocation = 2;
|
||||
int msgCount = 100;
|
||||
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
|
||||
invokeTellSelf(wantedNumberOfTellSelfInvocation);
|
||||
|
||||
config.setInterval(deduplicationInterval);
|
||||
config.setStrategy(DeduplicationStrategy.ALL);
|
||||
config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name());
|
||||
config.setQueueName(HIGH_PRIORITY_QUEUE_NAME);
|
||||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
||||
node.init(ctx, nodeConfiguration);
|
||||
|
||||
DeviceId deviceId = new DeviceId(UUID.randomUUID());
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
|
||||
List<TbMsg> inputMsgs = getTbMsgs(deviceId, msgCount, currentTimeMillis, 500);
|
||||
for (TbMsg msg : inputMsgs) {
|
||||
node.onMsg(ctx, msg);
|
||||
}
|
||||
|
||||
awaitTellSelfLatch.await();
|
||||
|
||||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
|
||||
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
|
||||
|
||||
verify(ctx, times(msgCount)).ack(any());
|
||||
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any());
|
||||
verify(ctx, times(1)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
|
||||
|
||||
Assertions.assertEquals(1, newMsgCaptor.getAllValues().size());
|
||||
TbMsg outMessage = newMsgCaptor.getAllValues().get(0);
|
||||
Assertions.assertEquals(getMergedData(inputMsgs), outMessage.getData());
|
||||
Assertions.assertEquals(deviceId, outMessage.getOriginator());
|
||||
Assertions.assertEquals(config.getOutMsgType(), outMessage.getType());
|
||||
Assertions.assertEquals(config.getQueueName(), outMessage.getQueueName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void given_100_messages_strategy_all_then_verifyOutput_2_packs() throws TbNodeException, ExecutionException, InterruptedException {
|
||||
int wantedNumberOfTellSelfInvocation = 2;
|
||||
int msgCount = 100;
|
||||
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
|
||||
invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3);
|
||||
|
||||
config.setInterval(deduplicationInterval);
|
||||
config.setStrategy(DeduplicationStrategy.ALL);
|
||||
config.setOutMsgType(SessionMsgType.POST_ATTRIBUTES_REQUEST.name());
|
||||
config.setQueueName(HIGH_PRIORITY_QUEUE_NAME);
|
||||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
||||
node.init(ctx, nodeConfiguration);
|
||||
|
||||
DeviceId deviceId = new DeviceId(UUID.randomUUID());
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
|
||||
List<TbMsg> firstMsgPack = getTbMsgs(deviceId, msgCount / 2, currentTimeMillis, 500);
|
||||
for (TbMsg msg : firstMsgPack) {
|
||||
node.onMsg(ctx, msg);
|
||||
}
|
||||
long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval);
|
||||
|
||||
List<TbMsg> secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500);
|
||||
for (TbMsg msg : secondMsgPack) {
|
||||
node.onMsg(ctx, msg);
|
||||
}
|
||||
|
||||
awaitTellSelfLatch.await();
|
||||
|
||||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
|
||||
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
|
||||
|
||||
verify(ctx, times(msgCount)).ack(any());
|
||||
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any());
|
||||
verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
|
||||
|
||||
List<TbMsg> resultMsgs = newMsgCaptor.getAllValues();
|
||||
Assertions.assertEquals(2, resultMsgs.size());
|
||||
|
||||
TbMsg firstMsg = resultMsgs.get(0);
|
||||
Assertions.assertEquals(getMergedData(firstMsgPack), firstMsg.getData());
|
||||
Assertions.assertEquals(deviceId, firstMsg.getOriginator());
|
||||
Assertions.assertEquals(config.getOutMsgType(), firstMsg.getType());
|
||||
Assertions.assertEquals(config.getQueueName(), firstMsg.getQueueName());
|
||||
|
||||
TbMsg secondMsg = resultMsgs.get(1);
|
||||
Assertions.assertEquals(getMergedData(secondMsgPack), secondMsg.getData());
|
||||
Assertions.assertEquals(deviceId, secondMsg.getOriginator());
|
||||
Assertions.assertEquals(config.getOutMsgType(), secondMsg.getType());
|
||||
Assertions.assertEquals(config.getQueueName(), secondMsg.getQueueName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void given_100_messages_strategy_last_then_verifyOutput_2_packs() throws TbNodeException, ExecutionException, InterruptedException {
|
||||
int wantedNumberOfTellSelfInvocation = 2;
|
||||
int msgCount = 100;
|
||||
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
|
||||
invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3);
|
||||
|
||||
config.setInterval(deduplicationInterval);
|
||||
config.setStrategy(DeduplicationStrategy.LAST);
|
||||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
||||
node.init(ctx, nodeConfiguration);
|
||||
|
||||
DeviceId deviceId = new DeviceId(UUID.randomUUID());
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
|
||||
List<TbMsg> firstMsgPack = getTbMsgs(deviceId, msgCount / 2, currentTimeMillis, 500);
|
||||
for (TbMsg msg : firstMsgPack) {
|
||||
node.onMsg(ctx, msg);
|
||||
}
|
||||
long firstPackDeduplicationPackEndTs = firstMsgPack.get(0).getMetaDataTs() + TimeUnit.SECONDS.toMillis(deduplicationInterval);
|
||||
TbMsg msgWithLatestTsInFirstPack = getMsgWithLatestTs(firstMsgPack);
|
||||
|
||||
List<TbMsg> secondMsgPack = getTbMsgs(deviceId, msgCount / 2, firstPackDeduplicationPackEndTs, 500);
|
||||
for (TbMsg msg : secondMsgPack) {
|
||||
node.onMsg(ctx, msg);
|
||||
}
|
||||
TbMsg msgWithLatestTsInSecondPack = getMsgWithLatestTs(secondMsgPack);
|
||||
|
||||
awaitTellSelfLatch.await();
|
||||
|
||||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<Runnable> successCaptor = ArgumentCaptor.forClass(Runnable.class);
|
||||
ArgumentCaptor<Consumer<Throwable>> failureCaptor = ArgumentCaptor.forClass(Consumer.class);
|
||||
|
||||
verify(ctx, times(msgCount)).ack(any());
|
||||
verify(node, times(msgCount + wantedNumberOfTellSelfInvocation)).onMsg(eq(ctx), any());
|
||||
verify(ctx, times(2)).enqueueForTellNext(newMsgCaptor.capture(), eq(TbRelationTypes.SUCCESS), successCaptor.capture(), failureCaptor.capture());
|
||||
|
||||
List<TbMsg> resultMsgs = newMsgCaptor.getAllValues();
|
||||
Assertions.assertEquals(2, resultMsgs.size());
|
||||
Assertions.assertTrue(resultMsgs.contains(msgWithLatestTsInFirstPack));
|
||||
Assertions.assertTrue(resultMsgs.contains(msgWithLatestTsInSecondPack));
|
||||
}
|
||||
|
||||
private TbMsg getMsgWithLatestTs(List<TbMsg> firstMsgPack) {
|
||||
int indexOfLastMsgInArray = firstMsgPack.size() - 1;
|
||||
int indexToSetMaxTs = new Random().nextInt(indexOfLastMsgInArray) + 1;
|
||||
TbMsg currentMaxTsMsg = firstMsgPack.get(indexOfLastMsgInArray);
|
||||
TbMsg newLastMsgOfArray = firstMsgPack.get(indexToSetMaxTs);
|
||||
firstMsgPack.set(indexOfLastMsgInArray, newLastMsgOfArray);
|
||||
firstMsgPack.set(indexToSetMaxTs, currentMaxTsMsg);
|
||||
return currentMaxTsMsg;
|
||||
}
|
||||
|
||||
private List<TbMsg> getTbMsgs(DeviceId deviceId, int msgCount, long currentTimeMillis, int initTsStep) {
|
||||
List<TbMsg> inputMsgs = new ArrayList<>();
|
||||
var ts = currentTimeMillis + initTsStep;
|
||||
for (int i = 0; i < msgCount; i++) {
|
||||
inputMsgs.add(createMsg(deviceId, ts));
|
||||
ts += 2;
|
||||
}
|
||||
return inputMsgs;
|
||||
}
|
||||
|
||||
private TbMsg createMsg(DeviceId deviceId, long ts) {
|
||||
ObjectNode dataNode = JacksonUtil.newObjectNode();
|
||||
dataNode.put("deviceId", deviceId.getId().toString());
|
||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||
metaData.putValue("ts", String.valueOf(ts));
|
||||
return TbMsg.newMsg(
|
||||
MAIN_QUEUE_NAME,
|
||||
SessionMsgType.POST_TELEMETRY_REQUEST.name(),
|
||||
deviceId,
|
||||
metaData,
|
||||
JacksonUtil.toString(dataNode));
|
||||
}
|
||||
|
||||
private String getMergedData(List<TbMsg> msgs) {
|
||||
ArrayNode mergedData = JacksonUtil.OBJECT_MAPPER.createArrayNode();
|
||||
msgs.forEach(msg -> {
|
||||
ObjectNode msgNode = JacksonUtil.newObjectNode();
|
||||
msgNode.set("msg", JacksonUtil.toJsonNode(msg.getData()));
|
||||
msgNode.set("metadata", JacksonUtil.valueToTree(msg.getMetaData().getData()));
|
||||
mergedData.add(msgNode);
|
||||
});
|
||||
return JacksonUtil.toString(mergedData);
|
||||
}
|
||||
|
||||
}
|
||||
@ -188,7 +188,7 @@ export default abstract class LeafletMap {
|
||||
entities = this.datasources.filter(pData => !this.isValidPolygonPosition(pData));
|
||||
break;
|
||||
case 'Marker':
|
||||
entities = this.datasources.filter(mData => !this.convertPosition(mData));
|
||||
entities = this.datasources.filter(mData => !this.extractPosition(mData));
|
||||
break;
|
||||
case 'Circle':
|
||||
entities = this.datasources.filter(mData => !this.isValidCircle(mData));
|
||||
@ -616,16 +616,29 @@ export default abstract class LeafletMap {
|
||||
}
|
||||
}
|
||||
|
||||
convertPosition(expression: object): L.LatLng {
|
||||
if (!expression) {
|
||||
extractPosition(data: FormattedData): {x: number, y: number} {
|
||||
if (!data) {
|
||||
return null;
|
||||
}
|
||||
const lat = expression[this.options.latKeyName];
|
||||
const lng = expression[this.options.lngKeyName];
|
||||
const lat = data[this.options.latKeyName];
|
||||
const lng = data[this.options.lngKeyName];
|
||||
if (!isDefinedAndNotNull(lat) || isString(lat) || isNaN(lat) || !isDefinedAndNotNull(lng) || isString(lng) || isNaN(lng)) {
|
||||
return null;
|
||||
}
|
||||
return L.latLng(lat, lng) as L.LatLng;
|
||||
return {x: lat, y: lng};
|
||||
}
|
||||
|
||||
positionToLatLng(position: {x: number, y: number}): L.LatLng {
|
||||
return L.latLng(position.x, position.y) as L.LatLng;
|
||||
}
|
||||
|
||||
convertPosition(data: FormattedData, dsData: FormattedData[]): L.LatLng {
|
||||
const position = this.extractPosition(data);
|
||||
if (position) {
|
||||
return this.positionToLatLng(position);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
convertPositionPolygon(expression: (LatLngTuple | LatLngTuple[] | LatLngTuple[][])[]) {
|
||||
@ -707,7 +720,7 @@ export default abstract class LeafletMap {
|
||||
if (this.options.draggableMarker && !this.options.hideDrawControlButton && !this.options.hideAllControlButton) {
|
||||
let foundEntityWithoutLocation = false;
|
||||
for (const mData of formattedData) {
|
||||
const position = this.convertPosition(mData);
|
||||
const position = this.extractPosition(mData);
|
||||
if (!position) {
|
||||
foundEntityWithoutLocation = true;
|
||||
} else if (!!position) {
|
||||
@ -836,7 +849,7 @@ export default abstract class LeafletMap {
|
||||
|
||||
// Markers
|
||||
updateMarkers(markersData: FormattedData[], updateBounds = true, callback?) {
|
||||
const rawMarkers = markersData.filter(mdata => !!this.convertPosition(mdata));
|
||||
const rawMarkers = markersData.filter(mdata => !!this.extractPosition(mdata));
|
||||
const toDelete = new Set(Array.from(this.markers.keys()));
|
||||
const createdMarkers: Marker[] = [];
|
||||
const updatedMarkers: Marker[] = [];
|
||||
@ -900,7 +913,7 @@ export default abstract class LeafletMap {
|
||||
|
||||
private createMarker(key: string, data: FormattedData, dataSources: FormattedData[], settings: Partial<WidgetMarkersSettings>,
|
||||
updateBounds = true, callback?, snappable = false): Marker {
|
||||
const newMarker = new Marker(this, this.convertPosition(data), settings, data, dataSources, this.dragMarker, snappable);
|
||||
const newMarker = new Marker(this, this.convertPosition(data, dataSources), settings, data, dataSources, this.dragMarker, snappable);
|
||||
if (callback) {
|
||||
newMarker.leafletMarker.on('click', () => {
|
||||
callback(data, true);
|
||||
@ -912,13 +925,16 @@ export default abstract class LeafletMap {
|
||||
this.markers.set(key, newMarker);
|
||||
if (!this.options.useClusterMarkers) {
|
||||
this.map.addLayer(newMarker.leafletMarker);
|
||||
if (this.map.pm.globalDragModeEnabled() && newMarker.leafletMarker.pm) {
|
||||
newMarker.leafletMarker.pm.enableLayerDrag();
|
||||
}
|
||||
}
|
||||
return newMarker;
|
||||
}
|
||||
|
||||
private updateMarker(key: string, data: FormattedData, dataSources: FormattedData[], settings: Partial<WidgetMarkersSettings>): Marker {
|
||||
const marker: Marker = this.markers.get(key);
|
||||
const location = this.convertPosition(data);
|
||||
const location = this.convertPosition(data, dataSources);
|
||||
marker.updateMarkerPosition(location);
|
||||
marker.setDataSources(data, dataSources);
|
||||
if (settings.showTooltip) {
|
||||
@ -961,12 +977,12 @@ export default abstract class LeafletMap {
|
||||
for (const pointsList of pointsData) {
|
||||
for (let tsIndex = 0; tsIndex < pointsList.length; tsIndex++) {
|
||||
const pdata = pointsList[tsIndex];
|
||||
if (!!this.convertPosition(pdata)) {
|
||||
if (!!this.extractPosition(pdata)) {
|
||||
const dsData = pointsData.map(ds => ds[tsIndex]);
|
||||
if (this.options.useColorPointFunction) {
|
||||
pointColor = safeExecute(this.options.parsedColorPointFunction, [pdata, dsData, pdata.dsIndex]);
|
||||
}
|
||||
const point = L.circleMarker(this.convertPosition(pdata), {
|
||||
const point = L.circleMarker(this.convertPosition(pdata, dsData), {
|
||||
color: pointColor,
|
||||
radius: this.options.pointSize
|
||||
});
|
||||
@ -1014,7 +1030,7 @@ export default abstract class LeafletMap {
|
||||
createPolyline(data: FormattedData, tsData: FormattedData[], dsData: FormattedData[],
|
||||
settings: Partial<WidgetPolylineSettings>, updateBounds = true) {
|
||||
const poly = new Polyline(this.map,
|
||||
tsData.map(el => this.convertPosition(el)).filter(el => !!el), data, dsData, settings);
|
||||
tsData.map(el => this.extractPosition(el)).filter(el => !!el).map(el => this.positionToLatLng(el)), data, dsData, settings);
|
||||
if (updateBounds) {
|
||||
const bounds = poly.leafletPoly.getBounds();
|
||||
this.fitBounds(bounds);
|
||||
@ -1026,7 +1042,8 @@ export default abstract class LeafletMap {
|
||||
settings: Partial<WidgetPolylineSettings>, updateBounds = true) {
|
||||
const poly = this.polylines.get(data.entityName);
|
||||
const oldBounds = poly.leafletPoly.getBounds();
|
||||
poly.updatePolyline(tsData.map(el => this.convertPosition(el)).filter(el => !!el), data, dsData, settings);
|
||||
poly.updatePolyline(tsData.map(el => this.extractPosition(el)).filter(el => !!el)
|
||||
.map(el => this.positionToLatLng(el)), data, dsData, settings);
|
||||
const newBounds = poly.leafletPoly.getBounds();
|
||||
if (updateBounds && oldBounds.toBBoxString() !== newBounds.toBBoxString()) {
|
||||
this.fitBounds(newBounds);
|
||||
@ -1106,6 +1123,9 @@ export default abstract class LeafletMap {
|
||||
const bounds = polygon.leafletPoly.getBounds();
|
||||
this.fitBounds(bounds);
|
||||
}
|
||||
if (this.map.pm.globalDragModeEnabled() && polygon.leafletPoly.pm) {
|
||||
polygon.leafletPoly.pm.enableLayerDrag();
|
||||
}
|
||||
this.polygons.set(polyData.entityName, polygon);
|
||||
}
|
||||
|
||||
@ -1206,6 +1226,9 @@ export default abstract class LeafletMap {
|
||||
const bounds = circle.leafletCircle.getBounds();
|
||||
this.fitBounds(bounds);
|
||||
}
|
||||
if (this.map.pm.globalDragModeEnabled() && circle.leafletCircle.pm) {
|
||||
circle.leafletCircle.pm.enableLayerDrag();
|
||||
}
|
||||
this.circles.set(data.entityName, circle);
|
||||
}
|
||||
|
||||
|
||||
@ -48,8 +48,10 @@ export interface CircleData {
|
||||
}
|
||||
|
||||
export type GenericFunction = (data: FormattedData, dsData: FormattedData[], dsIndex: number) => string;
|
||||
export type MarkerImageFunction = (data: FormattedData, dsData: FormattedData[], dsIndex: number) => MarkerImageInfo;
|
||||
export type PosFuncton = (origXPos, origYPos) => { x, y };
|
||||
export type MarkerImageFunction = (data: FormattedData, markerImages: string[],
|
||||
dsData: FormattedData[], dsIndex: number) => MarkerImageInfo;
|
||||
export type PosFunction = (origXPos, origYPos, data: FormattedData,
|
||||
dsData: FormattedData[], dsIndex: number, aspect: number) => { x: number, y: number };
|
||||
export type MarkerIconReadyFunction = (icon: MarkerIconInfo) => void;
|
||||
|
||||
export enum GoogleMapType {
|
||||
|
||||
@ -20,7 +20,7 @@ import {
|
||||
CircleData,
|
||||
defaultImageMapProviderSettings,
|
||||
MapImage,
|
||||
PosFuncton,
|
||||
PosFunction,
|
||||
WidgetUnitedMapSettings
|
||||
} from '../map-models';
|
||||
import { Observable, ReplaySubject } from 'rxjs';
|
||||
@ -30,7 +30,7 @@ import {
|
||||
calculateNewPointCoordinate
|
||||
} from '@home/components/widget/lib/maps/common-maps-utils';
|
||||
import { WidgetContext } from '@home/models/widget-component.models';
|
||||
import { DataSet, DatasourceType, widgetType } from '@shared/models/widget.models';
|
||||
import { DataSet, DatasourceType, FormattedData, widgetType } from '@shared/models/widget.models';
|
||||
import { DataKeyType } from '@shared/models/telemetry/telemetry.models';
|
||||
import { WidgetSubscriptionOptions } from '@core/api/widget-api.models';
|
||||
import { isDefinedAndNotNull, isEmptyStr, isNotEmptyStr, parseFunction } from '@core/utils';
|
||||
@ -45,11 +45,12 @@ export class ImageMap extends LeafletMap {
|
||||
width = 0;
|
||||
height = 0;
|
||||
imageUrl: string;
|
||||
posFunction: PosFuncton;
|
||||
posFunction: PosFunction;
|
||||
|
||||
constructor(ctx: WidgetContext, $container: HTMLElement, options: WidgetUnitedMapSettings) {
|
||||
super(ctx, $container, options);
|
||||
this.posFunction = parseFunction(options.posFunction, ['origXPos', 'origYPos']) as PosFuncton;
|
||||
this.posFunction = parseFunction(options.posFunction,
|
||||
['origXPos', 'origYPos', 'data', 'dsData', 'dsIndex', 'aspect']) as PosFunction;
|
||||
this.mapImage(options).subscribe((mapImage) => {
|
||||
this.imageUrl = mapImage.imageUrl;
|
||||
this.aspect = mapImage.aspect;
|
||||
@ -248,16 +249,32 @@ export class ImageMap extends LeafletMap {
|
||||
}
|
||||
}
|
||||
|
||||
convertPosition(expression): L.LatLng {
|
||||
const xPos = expression[this.options.xPosKeyName];
|
||||
const yPos = expression[this.options.yPosKeyName];
|
||||
extractPosition(data: FormattedData): {x: number, y: number} {
|
||||
if (!data) {
|
||||
return null;
|
||||
}
|
||||
const xPos = data[this.options.xPosKeyName];
|
||||
const yPos = data[this.options.yPosKeyName];
|
||||
if (!isDefinedAndNotNull(xPos) || isEmptyStr(xPos) || isNaN(xPos) || !isDefinedAndNotNull(yPos) || isEmptyStr(yPos) || isNaN(yPos)) {
|
||||
return null;
|
||||
}
|
||||
Object.assign(expression, this.posFunction(xPos, yPos));
|
||||
return {x: xPos, y: yPos};
|
||||
}
|
||||
|
||||
positionToLatLng(position: {x: number, y: number}): L.LatLng {
|
||||
return this.pointToLatLng(
|
||||
expression.x * this.width,
|
||||
expression.y * this.height);
|
||||
position.x * this.width,
|
||||
position.y * this.height);
|
||||
}
|
||||
|
||||
convertPosition(data, dsData: FormattedData[]): L.LatLng {
|
||||
const position = this.extractPosition(data);
|
||||
if (position) {
|
||||
const converted = this.posFunction(position.x, position.y, data, dsData, data.dsIndex, this.aspect) || {x: 0, y: 0};
|
||||
return this.positionToLatLng(converted);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
convertPositionPolygon(expression: (LatLngTuple | LatLngTuple[] | LatLngTuple[][])[]){
|
||||
|
||||
@ -32,7 +32,7 @@
|
||||
formControlName="posFunction"
|
||||
minHeight="100px"
|
||||
[globalVariables]="functionScopeVariables"
|
||||
[functionArgs]="['origXPos', 'origYPos']"
|
||||
[functionArgs]="['origXPos', 'origYPos', 'data', 'dsData', 'dsIndex', 'aspect']"
|
||||
functionTitle="{{ 'widgets.maps.position-function' | translate }}"
|
||||
helpId="widget/lib/map/position_fn">
|
||||
</tb-js-func>
|
||||
|
||||
@ -3,14 +3,18 @@
|
||||
<div class="divider"></div>
|
||||
<br/>
|
||||
|
||||
*function (origXPos, origYPos): {x: number, y: number}*
|
||||
*function (origXPos, origYPos, data, dsData, dsIndex, aspect): {x: number, y: number}*
|
||||
|
||||
A JavaScript function used to convert original relative x, y coordinates of the marker.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
- **origXPos:** <code>number</code> - original relative x coordinate as double from 0 to 1;
|
||||
- **origYPos:** <code>number</code> - original relative y coordinate as double from 0 to 1;
|
||||
<ul>
|
||||
<li><b>origXPos:</b> <code>number</code> - original relative x coordinate as double from 0 to 1.</li>
|
||||
<li><b>origYPos:</b> <code>number</code> - original relative y coordinate as double from 0 to 1.</li>
|
||||
{% include widget/lib/map/map_fn_args %}
|
||||
<li><b>aspect:</b> <code>number</code> - image map aspect ratio.</li>
|
||||
</ul>
|
||||
|
||||
**Returns:**
|
||||
|
||||
@ -37,5 +41,25 @@ return {x: origXPos / 2, y: origYPos / 2};
|
||||
{:copy-code}
|
||||
```
|
||||
|
||||
* Detect markers with same positions and place them with minimum overlap:
|
||||
|
||||
```javascript
|
||||
var xPos = data.xPos;
|
||||
var yPos = data.yPos;
|
||||
var locationGroup = dsData.filter((item) => item.xPos === xPos && item.yPos === yPos);
|
||||
if (locationGroup.length > 1) {
|
||||
const count = locationGroup.length;
|
||||
const index = locationGroup.indexOf(data);
|
||||
const radius = 0.035;
|
||||
const angle = (360 / count) * index - 45;
|
||||
const x = xPos + radius * Math.sin(angle*Math.PI/180) / aspect;
|
||||
const y = yPos + radius * Math.cos(angle*Math.PI/180);
|
||||
return {x: x, y: y};
|
||||
} else {
|
||||
return {x: xPos, y: yPos};
|
||||
}
|
||||
{:copy-code}
|
||||
```
|
||||
|
||||
<br>
|
||||
<br>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user