InMemoryQueue - fix racecondition

This commit is contained in:
vparomskiy 2018-05-11 12:58:01 +03:00
parent 61baafe6fb
commit 9873aaff5c
8 changed files with 22 additions and 28 deletions

View File

@ -22,7 +22,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.queue.MsgQueue; import org.thingsboard.server.dao.queue.MsgQueue;
@ -30,12 +29,7 @@ import org.thingsboard.server.dao.util.SqlDao;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.ArrayList; import java.util.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -72,13 +66,13 @@ public class InMemoryMsgQueue implements MsgQueue {
@Override @Override
public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) { public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) {
if (pendingMsgCount.get() < maxSize) { if (pendingMsgCount.incrementAndGet() < maxSize) {
return queueExecutor.submit(() -> { return queueExecutor.submit(() -> {
data.computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg); data.computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg);
pendingMsgCount.incrementAndGet();
return null; return null;
}); });
} else { } else {
pendingMsgCount.decrementAndGet();
return Futures.immediateFailedFuture(new RuntimeException("Message queue is full!")); return Futures.immediateFailedFuture(new RuntimeException("Message queue is full!"));
} }
} }

View File

@ -30,7 +30,7 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
name = "log", name = "log",
configClazz = TbLogNodeConfiguration.class, configClazz = TbLogNodeConfiguration.class,
nodeDescription = "Log incoming messages using JS script for transformation Message into String", nodeDescription = "Log incoming messages using JS script for transformation Message into String",
nodeDetails = "Transform incoming Message with configured JS condition to String and log final value. " + nodeDetails = "Transform incoming Message with configured JS function to String and log final value into Thingsboard log file. " +
"Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>" + "Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>" +
"Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>", "Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>",
uiResources = {"static/rulenode/rulenode-core-config.js"}, uiResources = {"static/rulenode/rulenode-core-config.js"},

View File

@ -38,7 +38,7 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
name = "generator", name = "generator",
configClazz = TbMsgGeneratorNodeConfiguration.class, configClazz = TbMsgGeneratorNodeConfiguration.class,
nodeDescription = "Periodically generates messages", nodeDescription = "Periodically generates messages",
nodeDetails = "Generates messages with configurable period. ", nodeDetails = "Generates messages with configurable period. Javascript function used fore message generation.",
inEnabled = false, inEnabled = false,
uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"}, uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
configDirective = "tbActionNodeGeneratorConfig", configDirective = "tbActionNodeGeneratorConfig",

View File

@ -1,12 +1,12 @@
/** /**
* Copyright © 2016-2018 The Thingsboard Authors * Copyright © 2016-2018 The Thingsboard Authors
* <p> *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* <p> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -1,12 +1,12 @@
/** /**
* Copyright © 2016-2018 The Thingsboard Authors * Copyright © 2016-2018 The Thingsboard Authors
* <p> *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* <p> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -1,12 +1,12 @@
/** /**
* Copyright © 2016-2018 The Thingsboard Authors * Copyright © 2016-2018 The Thingsboard Authors
* <p> *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* <p> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -26,7 +26,7 @@ import org.thingsboard.server.common.msg.TbMsg;
name = "script", name = "script",
configClazz = TbTransformMsgNodeConfiguration.class, configClazz = TbTransformMsgNodeConfiguration.class,
nodeDescription = "Change Message payload, Metadata or Message type using JavaScript", nodeDescription = "Change Message payload, Metadata or Message type using JavaScript",
nodeDetails = "JavaScript function receive 3 input parameters.<br/> " + nodeDetails = "JavaScript function receive 3 input parameters <br/> " +
"<code>metadata</code> - is a Message metadata.<br/>" + "<code>metadata</code> - is a Message metadata.<br/>" +
"<code>msg</code> - is a Message payload.<br/>" + "<code>msg</code> - is a Message payload.<br/>" +
"<code>msgType</code> - is a Message type.<br/>" + "<code>msgType</code> - is a Message type.<br/>" +

View File

@ -1,12 +1,12 @@
/** /**
* Copyright © 2016-2018 The Thingsboard Authors * Copyright © 2016-2018 The Thingsboard Authors
* <p> *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* <p> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.