Merge with master
This commit is contained in:
commit
4d03c51743
@ -17,7 +17,7 @@
|
||||
},
|
||||
"type": "org.thingsboard.rule.engine.filter.TbJsFilterNode",
|
||||
"name": "Is Thermostat?",
|
||||
"debugMode": true,
|
||||
"debugMode": false,
|
||||
"configuration": {
|
||||
"jsScript": "return msg.id.entityType === \"DEVICE\" && msg.type === \"thermostat\";"
|
||||
}
|
||||
@ -113,7 +113,7 @@
|
||||
},
|
||||
"type": "org.thingsboard.rule.engine.action.TbCreateRelationNode",
|
||||
"name": "Relate to Asset",
|
||||
"debugMode": true,
|
||||
"debugMode": false,
|
||||
"configuration": {
|
||||
"direction": "FROM",
|
||||
"relationType": "ToAlarmPropagationAsset",
|
||||
|
||||
@ -81,7 +81,7 @@
|
||||
},
|
||||
"type": "org.thingsboard.rule.engine.filter.TbJsSwitchNode",
|
||||
"name": "Check Alarms",
|
||||
"debugMode": true,
|
||||
"debugMode": false,
|
||||
"configuration": {
|
||||
"jsScript": "var relations = [];\nif(metadata[\"ss_alarmTemperature\"] === \"true\"){\n if(msg.temperature > metadata[\"ss_thresholdTemperature\"]){\n relations.push(\"NewTempAlarm\");\n } else {\n relations.push(\"ClearTempAlarm\");\n }\n}\nif(metadata[\"ss_alarmHumidity\"] === \"true\"){\n if(msg.humidity < metadata[\"ss_thresholdHumidity\"]){\n relations.push(\"NewHumidityAlarm\");\n } else {\n relations.push(\"ClearHumidityAlarm\");\n }\n}\n\nreturn relations;"
|
||||
}
|
||||
@ -93,7 +93,7 @@
|
||||
},
|
||||
"type": "org.thingsboard.rule.engine.metadata.TbGetAttributesNode",
|
||||
"name": "Fetch Configuration",
|
||||
"debugMode": true,
|
||||
"debugMode": false,
|
||||
"configuration": {
|
||||
"clientAttributeNames": [],
|
||||
"sharedAttributeNames": [],
|
||||
|
||||
@ -163,7 +163,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
||||
String dispatcherName = tenantId.getId().equals(EntityId.NULL_UUID) ?
|
||||
DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME : DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
|
||||
return context.actorOf(
|
||||
Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getId()))
|
||||
Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getName(), ruleNode.getId()))
|
||||
.withDispatcher(dispatcherName), ruleNode.getId().toString());
|
||||
}
|
||||
|
||||
@ -200,7 +200,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
||||
log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);
|
||||
if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
|
||||
try {
|
||||
checkActive();
|
||||
checkActive(envelope.getTbMsg());
|
||||
RuleNodeId targetId = msg.getRuleNodeId();
|
||||
RuleNodeCtx targetCtx;
|
||||
if (targetId == null) {
|
||||
@ -216,6 +216,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
||||
log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId);
|
||||
msg.getCallback().onSuccess();
|
||||
}
|
||||
} catch (RuleNodeException rne) {
|
||||
envelope.getTbMsg().getCallback().onFailure(rne);
|
||||
} catch (Exception e) {
|
||||
envelope.getTbMsg().getCallback().onFailure(new RuleEngineException(e.getMessage()));
|
||||
}
|
||||
@ -225,11 +227,15 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
||||
}
|
||||
|
||||
void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
|
||||
checkActive();
|
||||
if (firstNode != null) {
|
||||
pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
|
||||
} else {
|
||||
envelope.getMsg().getCallback().onSuccess();
|
||||
try {
|
||||
checkActive(envelope.getMsg());
|
||||
if (firstNode != null) {
|
||||
pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
|
||||
} else {
|
||||
envelope.getMsg().getCallback().onSuccess();
|
||||
}
|
||||
} catch (RuleNodeException e) {
|
||||
log.debug("Rule Chain is not active. Current state [{}] for processor [{}][{}] tenant [{}]", state, entityId.getEntityType(), entityId, tenantId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -239,7 +245,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
||||
|
||||
private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {
|
||||
try {
|
||||
checkActive();
|
||||
checkActive(msg);
|
||||
EntityId entityId = msg.getOriginator();
|
||||
TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
|
||||
List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
|
||||
@ -272,6 +278,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
||||
putToQueue(tpi, msg, callbackWrapper, target);
|
||||
}
|
||||
}
|
||||
} catch (RuleNodeException rne) {
|
||||
msg.getCallback().onFailure(rne);
|
||||
} catch (Exception e) {
|
||||
msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
|
||||
}
|
||||
@ -333,4 +341,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RuleNodeException getInactiveException() {
|
||||
RuleNode firstRuleNode = firstNode != null ? firstNode.getSelf() : null;
|
||||
return new RuleNodeException("Rule Chain is not active! Failed to initialize.", ruleChainName, firstRuleNode);
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,12 +27,14 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
|
||||
|
||||
public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
|
||||
|
||||
private final String ruleChainName;
|
||||
private final RuleChainId ruleChainId;
|
||||
|
||||
private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId, String ruleChainName, RuleNodeId ruleNodeId) {
|
||||
super(systemContext, tenantId, ruleNodeId);
|
||||
this.ruleChainName = ruleChainName;
|
||||
this.ruleChainId = ruleChainId;
|
||||
setProcessor(new RuleNodeActorMessageProcessor(tenantId, ruleChainId, ruleNodeId, systemContext,
|
||||
setProcessor(new RuleNodeActorMessageProcessor(tenantId, this.ruleChainName, ruleNodeId, systemContext,
|
||||
context().parent(), context().self()));
|
||||
}
|
||||
|
||||
@ -96,19 +98,21 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
|
||||
|
||||
private final TenantId tenantId;
|
||||
private final RuleChainId ruleChainId;
|
||||
private final String ruleChainName;
|
||||
private final RuleNodeId ruleNodeId;
|
||||
|
||||
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId ruleChainId, String ruleChainName, RuleNodeId ruleNodeId) {
|
||||
super(context);
|
||||
this.tenantId = tenantId;
|
||||
this.ruleChainId = ruleChainId;
|
||||
this.ruleChainName = ruleChainName;
|
||||
this.ruleNodeId = ruleNodeId;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public RuleNodeActor create() throws Exception {
|
||||
return new RuleNodeActor(context, tenantId, ruleChainId, ruleNodeId);
|
||||
return new RuleNodeActor(context, tenantId, ruleChainId, ruleChainName, ruleNodeId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,37 +17,33 @@ package org.thingsboard.server.actors.ruleChain;
|
||||
|
||||
import akka.actor.ActorContext;
|
||||
import akka.actor.ActorRef;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
|
||||
import org.thingsboard.server.common.data.rule.RuleNode;
|
||||
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.common.msg.queue.RuleNodeException;
|
||||
|
||||
/**
|
||||
* @author Andrew Shvayka
|
||||
*/
|
||||
public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNodeId> {
|
||||
|
||||
private final ActorRef parent;
|
||||
private final String ruleChainName;
|
||||
private final ActorRef self;
|
||||
private final RuleChainService service;
|
||||
private RuleNode ruleNode;
|
||||
private TbNode tbNode;
|
||||
private DefaultTbContext defaultCtx;
|
||||
|
||||
RuleNodeActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, ActorSystemContext systemContext
|
||||
RuleNodeActorMessageProcessor(TenantId tenantId, String ruleChainName, RuleNodeId ruleNodeId, ActorSystemContext systemContext
|
||||
, ActorRef parent, ActorRef self) {
|
||||
super(systemContext, tenantId, ruleNodeId);
|
||||
this.parent = parent;
|
||||
this.ruleChainName = ruleChainName;
|
||||
this.self = self;
|
||||
this.service = systemContext.getRuleChainService();
|
||||
this.ruleNode = systemContext.getRuleChainService().findRuleNodeById(tenantId, entityId);
|
||||
this.defaultCtx = new DefaultTbContext(systemContext, new RuleNodeCtx(tenantId, parent, self, ruleNode));
|
||||
}
|
||||
@ -63,8 +59,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
|
||||
@Override
|
||||
public void onUpdate(ActorContext context) throws Exception {
|
||||
RuleNode newRuleNode = systemContext.getRuleChainService().findRuleNodeById(tenantId, entityId);
|
||||
boolean restartRequired = !(ruleNode.getType().equals(newRuleNode.getType())
|
||||
&& ruleNode.getConfiguration().equals(newRuleNode.getConfiguration()));
|
||||
boolean restartRequired = state != ComponentLifecycleState.ACTIVE ||
|
||||
!(ruleNode.getType().equals(newRuleNode.getType()) && ruleNode.getConfiguration().equals(newRuleNode.getConfiguration()));
|
||||
this.ruleNode = newRuleNode;
|
||||
this.defaultCtx.updateSelf(newRuleNode);
|
||||
if (restartRequired) {
|
||||
@ -91,7 +87,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
|
||||
}
|
||||
|
||||
public void onRuleToSelfMsg(RuleNodeToSelfMsg msg) throws Exception {
|
||||
checkActive();
|
||||
checkActive(msg.getMsg());
|
||||
if (ruleNode.isDebugMode()) {
|
||||
systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), "Self");
|
||||
}
|
||||
@ -103,7 +99,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
|
||||
}
|
||||
|
||||
void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
|
||||
checkActive();
|
||||
checkActive(msg.getMsg());
|
||||
if (ruleNode.isDebugMode()) {
|
||||
systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
|
||||
}
|
||||
@ -129,4 +125,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
|
||||
return tbNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RuleNodeException getInactiveException() {
|
||||
return new RuleNodeException("Rule Node is not active! Failed to initialize.", ruleChainName, ruleNode);
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,7 +22,10 @@ import org.thingsboard.server.actors.stats.StatsPersistTick;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
|
||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
|
||||
import org.thingsboard.server.common.msg.queue.RuleNodeException;
|
||||
|
||||
@Slf4j
|
||||
public abstract class ComponentMsgProcessor<T extends EntityId> extends AbstractContextAwareMsgProcessor {
|
||||
@ -74,11 +77,17 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
|
||||
schedulePeriodicMsgWithDelay(context, new StatsPersistTick(), statsPersistFrequency, statsPersistFrequency);
|
||||
}
|
||||
|
||||
protected void checkActive() {
|
||||
protected void checkActive(TbMsg tbMsg) throws RuleNodeException {
|
||||
if (state != ComponentLifecycleState.ACTIVE) {
|
||||
log.debug("Component is not active. Current state [{}] for processor [{}][{}] tenant [{}]", state, entityId.getEntityType(), entityId, tenantId);
|
||||
throw new IllegalStateException("Rule chain is not active! " + entityId + " - " + tenantId);
|
||||
RuleNodeException ruleNodeException = getInactiveException();
|
||||
if (tbMsg != null) {
|
||||
tbMsg.getCallback().onFailure(ruleNodeException);
|
||||
}
|
||||
throw ruleNodeException;
|
||||
}
|
||||
}
|
||||
|
||||
abstract protected RuleNodeException getInactiveException();
|
||||
|
||||
}
|
||||
|
||||
@ -338,7 +338,7 @@ public class AuthController extends BaseController {
|
||||
|
||||
@RequestMapping(value = "/noauth/oauth2Clients", method = RequestMethod.POST)
|
||||
@ResponseBody
|
||||
public List<OAuth2ClientInfo> getOath2Clients() throws ThingsboardException {
|
||||
public List<OAuth2ClientInfo> getOAuth2Clients() throws ThingsboardException {
|
||||
try {
|
||||
return oauth2Service.getOAuth2Clients();
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -23,7 +23,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
|
||||
|
||||
@ -23,7 +23,6 @@ import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
@ -77,8 +76,8 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS
|
||||
}
|
||||
}
|
||||
int submitSize = pendingPack.size();
|
||||
if (log.isInfoEnabled() && submitSize > 0) {
|
||||
log.info("[{}] submitting [{}] messages to rule engine", queueName, submitSize);
|
||||
if (log.isDebugEnabled() && submitSize > 0) {
|
||||
log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize);
|
||||
}
|
||||
pendingPack.forEach(msgConsumer);
|
||||
}
|
||||
|
||||
@ -19,14 +19,8 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy {
|
||||
@ -37,8 +31,8 @@ public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS
|
||||
|
||||
@Override
|
||||
public void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer) {
|
||||
if (log.isInfoEnabled()) {
|
||||
log.info("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size());
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size());
|
||||
}
|
||||
orderedMsgList.forEach(pair -> msgConsumer.accept(pair.uuid, pair.msg));
|
||||
}
|
||||
|
||||
@ -15,26 +15,18 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.queue.processing;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.gen.MsgProtos;
|
||||
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy {
|
||||
|
||||
@ -30,6 +30,5 @@ public class SequentialByTenantIdTbRuleEngineSubmitStrategy extends SequentialBy
|
||||
@Override
|
||||
protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) {
|
||||
return new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,8 +19,6 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@ -63,8 +61,8 @@ public class SequentialTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSu
|
||||
if (idx < listSize) {
|
||||
IdMsgPair pair = orderedMsgList.get(idx);
|
||||
expectedMsgId = pair.uuid;
|
||||
if (log.isInfoEnabled()) {
|
||||
log.info("[{}] submitting [{}] message to rule engine", queueName, pair.msg);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}] submitting [{}] message to rule engine", queueName, pair.msg);
|
||||
}
|
||||
msgConsumer.accept(pair.uuid, pair.msg);
|
||||
}
|
||||
|
||||
@ -82,10 +82,10 @@ public class TbRuleEngineProcessingStrategyFactory {
|
||||
retryCount++;
|
||||
double failedCount = result.getFailedMap().size() + result.getPendingMap().size();
|
||||
if (maxRetries > 0 && retryCount > maxRetries) {
|
||||
log.info("[{}] Skip reprocess of the rule engine pack due to max retries", queueName);
|
||||
log.debug("[{}] Skip reprocess of the rule engine pack due to max retries", queueName);
|
||||
return new TbRuleEngineProcessingDecision(true, null);
|
||||
} else if (maxAllowedFailurePercentage > 0 && (failedCount / initialTotalCount) > maxAllowedFailurePercentage) {
|
||||
log.info("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName);
|
||||
log.debug("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName);
|
||||
return new TbRuleEngineProcessingDecision(true, null);
|
||||
} else {
|
||||
ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toReprocess = new ConcurrentHashMap<>(initialTotalCount);
|
||||
@ -98,7 +98,7 @@ public class TbRuleEngineProcessingStrategyFactory {
|
||||
if (retrySuccessful) {
|
||||
result.getSuccessMap().forEach(toReprocess::put);
|
||||
}
|
||||
log.info("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
|
||||
log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
|
||||
if (log.isTraceEnabled()) {
|
||||
toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
|
||||
}
|
||||
@ -126,7 +126,7 @@ public class TbRuleEngineProcessingStrategyFactory {
|
||||
@Override
|
||||
public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) {
|
||||
if (!result.isSuccess()) {
|
||||
log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
|
||||
log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
|
||||
}
|
||||
if (log.isTraceEnabled()) {
|
||||
result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
|
||||
|
||||
@ -51,9 +51,9 @@ public class BasicOAuth2ClientMapper extends AbstractOAuth2ClientMapper implemen
|
||||
String firstName = getStringAttributeByKey(attributes, config.getBasic().getFirstNameAttributeKey());
|
||||
oauth2User.setFirstName(firstName);
|
||||
}
|
||||
if (!StringUtils.isEmpty(config.getBasic().getCustomerNameStrategyPattern())) {
|
||||
if (!StringUtils.isEmpty(config.getBasic().getCustomerNamePattern())) {
|
||||
StrSubstitutor sub = new StrSubstitutor(attributes, START_PLACEHOLDER_PREFIX, END_PLACEHOLDER_PREFIX);
|
||||
String customerName = sub.replace(config.getBasic().getCustomerNameStrategyPattern());
|
||||
String customerName = sub.replace(config.getBasic().getCustomerNamePattern());
|
||||
oauth2User.setCustomerName(customerName);
|
||||
}
|
||||
return getOrCreateSecurityUserFromOAuth2User(oauth2User, config.getBasic().isAllowUserCreation());
|
||||
@ -68,7 +68,7 @@ public class BasicOAuth2ClientMapper extends AbstractOAuth2ClientMapper implemen
|
||||
return email.substring(email .indexOf("@") + 1);
|
||||
case CUSTOM_TENANT_STRATEGY:
|
||||
StrSubstitutor sub = new StrSubstitutor(attributes, START_PLACEHOLDER_PREFIX, END_PLACEHOLDER_PREFIX);
|
||||
return sub.replace(config.getBasic().getTenantNameStrategyPattern());
|
||||
return sub.replace(config.getBasic().getTenantNamePattern());
|
||||
default:
|
||||
throw new RuntimeException("Tenant Name Strategy with type " + config.getBasic().getTenantNameStrategy() + " is not supported!");
|
||||
}
|
||||
@ -78,7 +78,6 @@ public class BasicOAuth2ClientMapper extends AbstractOAuth2ClientMapper implemen
|
||||
String result = null;
|
||||
try {
|
||||
result = (String) attributes.get(key);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("Can't convert attribute to String by key " + key);
|
||||
}
|
||||
|
||||
@ -41,7 +41,7 @@ public class CustomOAuth2ClientMapper extends AbstractOAuth2ClientMapper impleme
|
||||
return getOrCreateSecurityUserFromOAuth2User(oauth2User, config.getBasic().isAllowUserCreation());
|
||||
}
|
||||
|
||||
public OAuth2User getOAuth2User(OAuth2AuthenticationToken token, OAuth2ClientMapperConfig.CustomOAuth2ClientMapperConfig custom) {
|
||||
private synchronized OAuth2User getOAuth2User(OAuth2AuthenticationToken token, OAuth2ClientMapperConfig.CustomOAuth2ClientMapperConfig custom) {
|
||||
if (!StringUtils.isEmpty(custom.getUsername()) && !StringUtils.isEmpty(custom.getPassword())) {
|
||||
restTemplateBuilder = restTemplateBuilder.basicAuthentication(custom.getUsername(), custom.getPassword());
|
||||
}
|
||||
|
||||
@ -126,8 +126,8 @@ security:
|
||||
firstNameAttributeKey: "${SECURITY_OAUTH2_DEFAULT_MAPPER_BASIC_FIRST_NAME_ATTRIBUTE_KEY:}"
|
||||
lastNameAttributeKey: "${SECURITY_OAUTH2_DEFAULT_MAPPER_BASIC_LAST_NAME_ATTRIBUTE_KEY:}"
|
||||
tenantNameStrategy: "${SECURITY_OAUTH2_DEFAULT_MAPPER_BASIC_TENANT_NAME_STRATEGY:domain}" # domain, email or custom
|
||||
tenantNameStrategyPattern: "${SECURITY_OAUTH2_DEFAULT_MAPPER_BASIC_TENANT_NAME_STRATEGY_PATTERN:}"
|
||||
customerNameStrategyPattern: "${SECURITY_OAUTH2_DEFAULT_MAPPER_BASIC_CUSTOMER_NAME_STRATEGY_PATTERN:}"
|
||||
tenantNamePattern: "${SECURITY_OAUTH2_DEFAULT_MAPPER_BASIC_TENANT_NAME_PATTERN:}" # %{attribute_key} as placeholder for attributes value by key
|
||||
customerNamePattern: "${SECURITY_OAUTH2_DEFAULT_MAPPER_BASIC_CUSTOMER_NAME_PATTERN:}" # %{attribute_key} as placeholder for attributes value by key
|
||||
custom:
|
||||
url: "${SECURITY_OAUTH2_DEFAULT_MAPPER_CUSTOM_URL:}"
|
||||
username: "${SECURITY_OAUTH2_DEFAULT_MAPPER_CUSTOM_USERNAME:}"
|
||||
|
||||
@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
import org.thingsboard.server.common.msg.gen.MsgProtos;
|
||||
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
@ -47,7 +48,7 @@ public final class TbMsg implements Serializable {
|
||||
private final RuleChainId ruleChainId;
|
||||
private final RuleNodeId ruleNodeId;
|
||||
//This field is not serialized because we use queues and there is no need to do it
|
||||
private final TbMsgCallback callback;
|
||||
transient private final TbMsgCallback callback;
|
||||
|
||||
public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
|
||||
return new TbMsg(UUID.randomUUID(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY);
|
||||
@ -156,4 +157,13 @@ public final class TbMsg implements Serializable {
|
||||
public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
return new TbMsg(this.id, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, callback);
|
||||
}
|
||||
|
||||
public TbMsgCallback getCallback() {
|
||||
//May be null in case of deserialization;
|
||||
if (callback != null) {
|
||||
return callback;
|
||||
} else {
|
||||
return TbMsgCallback.EMPTY;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -36,9 +36,15 @@ public class RuleNodeException extends RuleEngineException {
|
||||
public RuleNodeException(String message, String ruleChainName, RuleNode ruleNode) {
|
||||
super(message);
|
||||
this.ruleChainName = ruleChainName;
|
||||
this.ruleNodeName = ruleNode.getName();
|
||||
this.ruleChainId = ruleNode.getRuleChainId();
|
||||
this.ruleNodeId = ruleNode.getId();
|
||||
if (ruleNode != null) {
|
||||
this.ruleNodeName = ruleNode.getName();
|
||||
this.ruleChainId = ruleNode.getRuleChainId();
|
||||
this.ruleNodeId = ruleNode.getId();
|
||||
} else {
|
||||
ruleNodeName = "Unknown";
|
||||
ruleChainId = new RuleChainId(RuleChainId.NULL_UUID);
|
||||
ruleNodeId = new RuleNodeId(RuleNodeId.NULL_UUID);
|
||||
}
|
||||
}
|
||||
|
||||
public String toJsonString() {
|
||||
|
||||
@ -67,6 +67,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
|
||||
|
||||
try {
|
||||
QueueDescription queueDescription = new QueueDescription(topic);
|
||||
queueDescription.setRequiresDuplicateDetection(false);
|
||||
setQueueConfigs(queueDescription);
|
||||
|
||||
client.createQueue(queueDescription);
|
||||
|
||||
@ -31,9 +31,9 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||
import org.thingsboard.server.queue.TbQueueMsg;
|
||||
import org.thingsboard.server.queue.TbQueueMsgDecoder;
|
||||
import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
|
||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
|
||||
|
||||
import java.time.Duration;
|
||||
@ -50,100 +50,70 @@ import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@Slf4j
|
||||
public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
|
||||
public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<MessageWithDeliveryTag, T> {
|
||||
private final TbQueueAdmin admin;
|
||||
private final String topic;
|
||||
private final TbQueueMsgDecoder<T> decoder;
|
||||
private final TbServiceBusSettings serviceBusSettings;
|
||||
|
||||
private final Gson gson = new Gson();
|
||||
|
||||
private Set<CoreMessageReceiver> receivers;
|
||||
private volatile Set<TopicPartitionInfo> partitions;
|
||||
private volatile boolean subscribed;
|
||||
private volatile boolean stopped = false;
|
||||
private Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<>();
|
||||
private final Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<>();
|
||||
private volatile int messagesPerQueue;
|
||||
|
||||
public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder<T> decoder) {
|
||||
super(topic);
|
||||
this.admin = admin;
|
||||
this.decoder = decoder;
|
||||
this.topic = topic;
|
||||
this.serviceBusSettings = serviceBusSettings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe() {
|
||||
partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
|
||||
subscribed = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(Set<TopicPartitionInfo> partitions) {
|
||||
this.partitions = partitions;
|
||||
subscribed = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe() {
|
||||
stopped = true;
|
||||
receivers.forEach(CoreMessageReceiver::closeAsync);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<T> poll(long durationInMillis) {
|
||||
if (!subscribed && partitions == null) {
|
||||
try {
|
||||
Thread.sleep(durationInMillis);
|
||||
} catch (InterruptedException e) {
|
||||
log.debug("Failed to await subscription", e);
|
||||
}
|
||||
} else {
|
||||
if (!subscribed) {
|
||||
createReceivers();
|
||||
messagesPerQueue = receivers.size() / partitions.size();
|
||||
subscribed = true;
|
||||
}
|
||||
|
||||
List<CompletableFuture<Collection<MessageWithDeliveryTag>>> messageFutures =
|
||||
receivers.stream()
|
||||
.map(receiver -> receiver
|
||||
.receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis))
|
||||
.whenComplete((messages, err) -> {
|
||||
if (!CollectionUtils.isEmpty(messages)) {
|
||||
pendingMessages.put(receiver, messages);
|
||||
} else if (err != null) {
|
||||
log.error("Failed to receive messages.", err);
|
||||
}
|
||||
}))
|
||||
.collect(Collectors.toList());
|
||||
try {
|
||||
return fromList(messageFutures)
|
||||
.get()
|
||||
.stream()
|
||||
.flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream())
|
||||
.map(message -> {
|
||||
try {
|
||||
return decode(message);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
log.error("Failed to parse message.", e);
|
||||
throw new RuntimeException("Failed to parse message.", e);
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
if (stopped) {
|
||||
log.info("[{}] Service Bus consumer is stopped.", topic);
|
||||
} else {
|
||||
log.error("Failed to receive messages", e);
|
||||
}
|
||||
protected List<MessageWithDeliveryTag> doPoll(long durationInMillis) {
|
||||
List<CompletableFuture<Collection<MessageWithDeliveryTag>>> messageFutures =
|
||||
receivers.stream()
|
||||
.map(receiver -> receiver
|
||||
.receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis))
|
||||
.whenComplete((messages, err) -> {
|
||||
if (!CollectionUtils.isEmpty(messages)) {
|
||||
pendingMessages.put(receiver, messages);
|
||||
} else if (err != null) {
|
||||
log.error("Failed to receive messages.", err);
|
||||
}
|
||||
}))
|
||||
.collect(Collectors.toList());
|
||||
try {
|
||||
return fromList(messageFutures)
|
||||
.get()
|
||||
.stream()
|
||||
.flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream())
|
||||
.collect(Collectors.toList());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
if (stopped) {
|
||||
log.info("[{}] Service Bus consumer is stopped.", getTopic());
|
||||
} else {
|
||||
log.error("Failed to receive messages", e);
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doSubscribe(List<String> topicNames) {
|
||||
createReceivers();
|
||||
messagesPerQueue = receivers.size() / partitions.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doCommit() {
|
||||
pendingMessages.forEach((receiver, msgs) ->
|
||||
msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN)));
|
||||
pendingMessages.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doUnsubscribe() {
|
||||
receivers.forEach(CoreMessageReceiver::closeAsync);
|
||||
}
|
||||
|
||||
private void createReceivers() {
|
||||
@ -167,7 +137,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQue
|
||||
receivers = new HashSet<>(fromList(receiverFutures).get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
if (stopped) {
|
||||
log.info("[{}] Service Bus consumer is stopped.", topic);
|
||||
log.info("[{}] Service Bus consumer is stopped.", getTopic());
|
||||
} else {
|
||||
log.error("Failed to create receivers", e);
|
||||
}
|
||||
@ -196,13 +166,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQue
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() {
|
||||
pendingMessages.forEach((receiver, msgs) ->
|
||||
msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN)));
|
||||
pendingMessages.clear();
|
||||
}
|
||||
|
||||
private T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException {
|
||||
protected T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException {
|
||||
DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class);
|
||||
return decoder.decode(msg);
|
||||
}
|
||||
|
||||
@ -33,6 +33,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@ -42,14 +43,14 @@ public class TbServiceBusProducerTemplate<T extends TbQueueMsg> implements TbQue
|
||||
private final Gson gson = new Gson();
|
||||
private final TbQueueAdmin admin;
|
||||
private final TbServiceBusSettings serviceBusSettings;
|
||||
private final Map<String, QueueClient> clients = new HashMap<>();
|
||||
private ExecutorService executorService;
|
||||
private final Map<String, QueueClient> clients = new ConcurrentHashMap<>();
|
||||
private final ExecutorService executorService;
|
||||
|
||||
public TbServiceBusProducerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String defaultTopic) {
|
||||
this.admin = admin;
|
||||
this.defaultTopic = defaultTopic;
|
||||
this.serviceBusSettings = serviceBusSettings;
|
||||
executorService = Executors.newSingleThreadExecutor();
|
||||
executorService = Executors.newCachedThreadPool();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -0,0 +1,53 @@
|
||||
/**
|
||||
* Copyright © 2016-2020 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.queue.common;
|
||||
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.queue.TbQueueMsg;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractParallelTbQueueConsumerTemplate<R, T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<R, T> {
|
||||
|
||||
protected ListeningExecutorService consumerExecutor;
|
||||
|
||||
public AbstractParallelTbQueueConsumerTemplate(String topic) {
|
||||
super(topic);
|
||||
}
|
||||
|
||||
protected void initNewExecutor(int threadPoolSize) {
|
||||
if (consumerExecutor != null) {
|
||||
consumerExecutor.shutdown();
|
||||
try {
|
||||
consumerExecutor.awaitTermination(1, TimeUnit.MINUTES);
|
||||
} catch (InterruptedException e) {
|
||||
log.trace("Interrupted while waiting for consumer executor to stop");
|
||||
}
|
||||
}
|
||||
consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolSize));
|
||||
}
|
||||
|
||||
protected void shutdownExecutor() {
|
||||
if (consumerExecutor != null) {
|
||||
consumerExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,152 @@
|
||||
/**
|
||||
* Copyright © 2016-2020 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.queue.common;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||
import org.thingsboard.server.queue.TbQueueMsg;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> implements TbQueueConsumer<T> {
|
||||
|
||||
private volatile boolean subscribed;
|
||||
protected volatile boolean stopped = false;
|
||||
protected volatile Set<TopicPartitionInfo> partitions;
|
||||
protected final Lock consumerLock = new ReentrantLock();
|
||||
|
||||
@Getter
|
||||
private final String topic;
|
||||
|
||||
public AbstractTbQueueConsumerTemplate(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe() {
|
||||
consumerLock.lock();
|
||||
try {
|
||||
partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
|
||||
subscribed = false;
|
||||
} finally {
|
||||
consumerLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(Set<TopicPartitionInfo> partitions) {
|
||||
consumerLock.lock();
|
||||
try {
|
||||
this.partitions = partitions;
|
||||
subscribed = false;
|
||||
} finally {
|
||||
consumerLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<T> poll(long durationInMillis) {
|
||||
if (!subscribed && partitions == null) {
|
||||
try {
|
||||
Thread.sleep(durationInMillis);
|
||||
} catch (InterruptedException e) {
|
||||
log.debug("Failed to await subscription", e);
|
||||
}
|
||||
} else {
|
||||
long pollStartTs = System.currentTimeMillis();
|
||||
consumerLock.lock();
|
||||
try {
|
||||
if (!subscribed) {
|
||||
List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
|
||||
doSubscribe(topicNames);
|
||||
subscribed = true;
|
||||
}
|
||||
|
||||
List<R> records = doPoll(durationInMillis);
|
||||
if (!records.isEmpty()) {
|
||||
List<T> result = new ArrayList<>(records.size());
|
||||
records.forEach(record -> {
|
||||
try {
|
||||
if (record != null) {
|
||||
result.add(decode(record));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("Failed decode record: [{}]", record);
|
||||
throw new RuntimeException("Failed to decode record: ", e);
|
||||
}
|
||||
});
|
||||
return result;
|
||||
} else {
|
||||
long pollDuration = System.currentTimeMillis() - pollStartTs;
|
||||
if (pollDuration < durationInMillis) {
|
||||
try {
|
||||
Thread.sleep(durationInMillis - pollDuration);
|
||||
} catch (InterruptedException e) {
|
||||
if (!stopped) {
|
||||
log.error("Failed to wait.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
consumerLock.unlock();
|
||||
}
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() {
|
||||
consumerLock.lock();
|
||||
try {
|
||||
doCommit();
|
||||
} finally {
|
||||
consumerLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe() {
|
||||
stopped = true;
|
||||
consumerLock.lock();
|
||||
try {
|
||||
doUnsubscribe();
|
||||
} finally {
|
||||
consumerLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
abstract protected List<R> doPoll(long durationInMillis);
|
||||
|
||||
abstract protected T decode(R record) throws IOException;
|
||||
|
||||
abstract protected void doSubscribe(List<String> topicNames);
|
||||
|
||||
abstract protected void doCommit();
|
||||
|
||||
abstract protected void doUnsubscribe();
|
||||
|
||||
}
|
||||
@ -16,16 +16,14 @@
|
||||
package org.thingsboard.server.queue.kafka;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||
import org.thingsboard.server.queue.TbQueueMsg;
|
||||
import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
@ -33,26 +31,16 @@ import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 24.09.18.
|
||||
*/
|
||||
@Slf4j
|
||||
public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
|
||||
public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<ConsumerRecord<String, byte[]>, T> {
|
||||
|
||||
private final TbQueueAdmin admin;
|
||||
private final KafkaConsumer<String, byte[]> consumer;
|
||||
private final TbKafkaDecoder<T> decoder;
|
||||
private volatile boolean subscribed;
|
||||
private volatile Set<TopicPartitionInfo> partitions;
|
||||
private final Lock consumerLock;
|
||||
|
||||
@Getter
|
||||
private final String topic;
|
||||
|
||||
@Builder
|
||||
private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
|
||||
@ -60,6 +48,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
|
||||
boolean autoCommit, int autoCommitIntervalMs,
|
||||
int maxPollRecords,
|
||||
TbQueueAdmin admin) {
|
||||
super(topic);
|
||||
Properties props = settings.toProps();
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
|
||||
if (groupId != null) {
|
||||
@ -75,94 +64,42 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
|
||||
this.admin = admin;
|
||||
this.consumer = new KafkaConsumer<>(props);
|
||||
this.decoder = decoder;
|
||||
this.topic = topic;
|
||||
this.consumerLock = new ReentrantLock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe() {
|
||||
consumerLock.lock();
|
||||
try {
|
||||
partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
|
||||
subscribed = false;
|
||||
} finally {
|
||||
consumerLock.unlock();
|
||||
}
|
||||
protected void doSubscribe(List<String> topicNames) {
|
||||
topicNames.forEach(admin::createTopicIfNotExists);
|
||||
consumer.subscribe(topicNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(Set<TopicPartitionInfo> partitions) {
|
||||
consumerLock.lock();
|
||||
try {
|
||||
this.partitions = partitions;
|
||||
subscribed = false;
|
||||
} finally {
|
||||
consumerLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<T> poll(long durationInMillis) {
|
||||
if (!subscribed && partitions == null) {
|
||||
try {
|
||||
Thread.sleep(durationInMillis);
|
||||
} catch (InterruptedException e) {
|
||||
log.debug("Failed to await subscription", e);
|
||||
}
|
||||
protected List<ConsumerRecord<String, byte[]>> doPoll(long durationInMillis) {
|
||||
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));
|
||||
if (records.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
consumerLock.lock();
|
||||
try {
|
||||
if (!subscribed) {
|
||||
List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
|
||||
topicNames.forEach(admin::createTopicIfNotExists);
|
||||
consumer.subscribe(topicNames);
|
||||
subscribed = true;
|
||||
}
|
||||
|
||||
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));
|
||||
if (records.count() > 0) {
|
||||
List<T> result = new ArrayList<>();
|
||||
records.forEach(record -> {
|
||||
try {
|
||||
result.add(decode(record));
|
||||
} catch (IOException e) {
|
||||
log.error("Failed decode record: [{}]", record);
|
||||
}
|
||||
});
|
||||
return result;
|
||||
}
|
||||
} finally {
|
||||
consumerLock.unlock();
|
||||
}
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() {
|
||||
consumerLock.lock();
|
||||
try {
|
||||
consumer.commitAsync();
|
||||
} finally {
|
||||
consumerLock.unlock();
|
||||
List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>(256);
|
||||
records.forEach(recordList::add);
|
||||
return recordList;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe() {
|
||||
consumerLock.lock();
|
||||
try {
|
||||
if (consumer != null) {
|
||||
consumer.unsubscribe();
|
||||
consumer.close();
|
||||
}
|
||||
} finally {
|
||||
consumerLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public T decode(ConsumerRecord<String, byte[]> record) throws IOException {
|
||||
return decoder.decode(new KafkaTbQueueMsg(record));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doCommit() {
|
||||
consumer.commitAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doUnsubscribe() {
|
||||
if (consumer != null) {
|
||||
consumer.unsubscribe();
|
||||
consumer.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.queue.pubsub;
|
||||
|
||||
import com.google.api.gax.rpc.AlreadyExistsException;
|
||||
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
|
||||
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
|
||||
import com.google.cloud.pubsub.v1.TopicAdminClient;
|
||||
@ -24,9 +25,9 @@ import com.google.pubsub.v1.ListSubscriptionsRequest;
|
||||
import com.google.pubsub.v1.ListTopicsRequest;
|
||||
import com.google.pubsub.v1.ProjectName;
|
||||
import com.google.pubsub.v1.ProjectSubscriptionName;
|
||||
import com.google.pubsub.v1.ProjectTopicName;
|
||||
import com.google.pubsub.v1.Subscription;
|
||||
import com.google.pubsub.v1.Topic;
|
||||
import com.google.pubsub.v1.TopicName;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||
|
||||
@ -103,7 +104,10 @@ public class TbPubSubAdmin implements TbQueueAdmin {
|
||||
|
||||
@Override
|
||||
public void createTopicIfNotExists(String partition) {
|
||||
ProjectTopicName topicName = ProjectTopicName.of(pubSubSettings.getProjectId(), partition);
|
||||
TopicName topicName = TopicName.newBuilder()
|
||||
.setTopic(partition)
|
||||
.setProject(pubSubSettings.getProjectId())
|
||||
.build();
|
||||
|
||||
if (topicSet.contains(topicName.toString())) {
|
||||
createSubscriptionIfNotExists(partition, topicName);
|
||||
@ -121,13 +125,18 @@ public class TbPubSubAdmin implements TbQueueAdmin {
|
||||
}
|
||||
}
|
||||
|
||||
topicAdminClient.createTopic(topicName);
|
||||
topicSet.add(topicName.toString());
|
||||
log.info("Created new topic: [{}]", topicName.toString());
|
||||
try {
|
||||
topicAdminClient.createTopic(topicName);
|
||||
log.info("Created new topic: [{}]", topicName.toString());
|
||||
} catch (AlreadyExistsException e) {
|
||||
log.info("[{}] Topic already exist.", topicName.toString());
|
||||
} finally {
|
||||
topicSet.add(topicName.toString());
|
||||
}
|
||||
createSubscriptionIfNotExists(partition, topicName);
|
||||
}
|
||||
|
||||
private void createSubscriptionIfNotExists(String partition, ProjectTopicName topicName) {
|
||||
private void createSubscriptionIfNotExists(String partition, TopicName topicName) {
|
||||
ProjectSubscriptionName subscriptionName =
|
||||
ProjectSubscriptionName.of(pubSubSettings.getProjectId(), partition);
|
||||
|
||||
@ -153,9 +162,14 @@ public class TbPubSubAdmin implements TbQueueAdmin {
|
||||
setAckDeadline(subscriptionBuilder);
|
||||
setMessageRetention(subscriptionBuilder);
|
||||
|
||||
subscriptionAdminClient.createSubscription(subscriptionBuilder.build());
|
||||
subscriptionSet.add(subscriptionName.toString());
|
||||
log.info("Created new subscription: [{}]", subscriptionName.toString());
|
||||
try {
|
||||
subscriptionAdminClient.createSubscription(subscriptionBuilder.build());
|
||||
log.info("Created new subscription: [{}]", subscriptionName.toString());
|
||||
} catch (AlreadyExistsException e) {
|
||||
log.info("[{}] Subscription already exist.", subscriptionName.toString());
|
||||
} finally {
|
||||
subscriptionSet.add(subscriptionName.toString());
|
||||
}
|
||||
}
|
||||
|
||||
private void setAckDeadline(Subscription.Builder builder) {
|
||||
|
||||
@ -30,27 +30,25 @@ import com.google.pubsub.v1.PullResponse;
|
||||
import com.google.pubsub.v1.ReceivedMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||
import org.thingsboard.server.queue.TbQueueMsg;
|
||||
import org.thingsboard.server.queue.TbQueueMsgDecoder;
|
||||
import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
|
||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
|
||||
public class TbPubSubConsumerTemplate<T extends TbQueueMsg> extends AbstractParallelTbQueueConsumerTemplate<PubsubMessage, T> {
|
||||
|
||||
private final Gson gson = new Gson();
|
||||
private final TbQueueAdmin admin;
|
||||
@ -58,23 +56,18 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
|
||||
private final TbQueueMsgDecoder<T> decoder;
|
||||
private final TbPubSubSettings pubSubSettings;
|
||||
|
||||
private volatile boolean subscribed;
|
||||
private volatile Set<TopicPartitionInfo> partitions;
|
||||
private volatile Set<String> subscriptionNames;
|
||||
private final List<AcknowledgeRequest> acknowledgeRequests = new CopyOnWriteArrayList<>();
|
||||
|
||||
private ExecutorService consumerExecutor;
|
||||
private final SubscriberStub subscriber;
|
||||
private volatile boolean stopped;
|
||||
|
||||
private volatile int messagesPerTopic;
|
||||
|
||||
public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder<T> decoder) {
|
||||
super(topic);
|
||||
this.admin = admin;
|
||||
this.pubSubSettings = pubSubSettings;
|
||||
this.topic = topic;
|
||||
this.decoder = decoder;
|
||||
|
||||
try {
|
||||
SubscriberStubSettings subscriberStubSettings =
|
||||
SubscriberStubSettings.newBuilder()
|
||||
@ -84,98 +77,59 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
|
||||
.setMaxInboundMessageSize(pubSubSettings.getMaxMsgSize())
|
||||
.build())
|
||||
.build();
|
||||
|
||||
this.subscriber = GrpcSubscriberStub.create(subscriberStubSettings);
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to create subscriber.", e);
|
||||
throw new RuntimeException("Failed to create subscriber.", e);
|
||||
}
|
||||
stopped = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe() {
|
||||
partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
|
||||
subscribed = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(Set<TopicPartitionInfo> partitions) {
|
||||
this.partitions = partitions;
|
||||
subscribed = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe() {
|
||||
stopped = true;
|
||||
if (consumerExecutor != null) {
|
||||
consumerExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
if (subscriber != null) {
|
||||
subscriber.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<T> poll(long durationInMillis) {
|
||||
if (!subscribed && partitions == null) {
|
||||
try {
|
||||
Thread.sleep(durationInMillis);
|
||||
} catch (InterruptedException e) {
|
||||
log.debug("Failed to await subscription", e);
|
||||
protected List<PubsubMessage> doPoll(long durationInMillis) {
|
||||
try {
|
||||
List<ReceivedMessage> messages = receiveMessages();
|
||||
if (!messages.isEmpty()) {
|
||||
return messages.stream().map(ReceivedMessage::getMessage).collect(Collectors.toList());
|
||||
}
|
||||
} else {
|
||||
if (!subscribed) {
|
||||
subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet());
|
||||
subscriptionNames.forEach(admin::createTopicIfNotExists);
|
||||
consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size());
|
||||
messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size();
|
||||
subscribed = true;
|
||||
}
|
||||
List<ReceivedMessage> messages;
|
||||
try {
|
||||
messages = receiveMessages();
|
||||
if (!messages.isEmpty()) {
|
||||
List<T> result = new ArrayList<>();
|
||||
messages.forEach(msg -> {
|
||||
try {
|
||||
result.add(decode(msg.getMessage()));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
log.error("Failed decode record: [{}]", msg);
|
||||
}
|
||||
});
|
||||
return result;
|
||||
}
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
if (stopped) {
|
||||
log.info("[{}] Pub/Sub consumer is stopped.", topic);
|
||||
} else {
|
||||
log.error("Failed to receive messages", e);
|
||||
}
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
if (stopped) {
|
||||
log.info("[{}] Pub/Sub consumer is stopped.", topic);
|
||||
} else {
|
||||
log.error("Failed to receive messages", e);
|
||||
}
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() {
|
||||
protected void doSubscribe(List<String> topicNames) {
|
||||
subscriptionNames = new LinkedHashSet<>(topicNames);
|
||||
subscriptionNames.forEach(admin::createTopicIfNotExists);
|
||||
initNewExecutor(subscriptionNames.size() + 1);
|
||||
messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doCommit() {
|
||||
acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall);
|
||||
acknowledgeRequests.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doUnsubscribe() {
|
||||
if (subscriber != null) {
|
||||
subscriber.close();
|
||||
}
|
||||
shutdownExecutor();
|
||||
}
|
||||
|
||||
private List<ReceivedMessage> receiveMessages() throws ExecutionException, InterruptedException {
|
||||
List<ApiFuture<List<ReceivedMessage>>> result = subscriptionNames.stream().map(subscriptionId -> {
|
||||
String subscriptionName = ProjectSubscriptionName.format(pubSubSettings.getProjectId(), subscriptionId);
|
||||
PullRequest pullRequest =
|
||||
PullRequest.newBuilder()
|
||||
.setMaxMessages(messagesPerTopic)
|
||||
.setReturnImmediately(false) // return immediately if messages are not available
|
||||
// .setReturnImmediately(false) // return immediately if messages are not available
|
||||
.setSubscription(subscriptionName)
|
||||
.build();
|
||||
|
||||
@ -211,6 +165,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
|
||||
return transform.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public T decode(PubsubMessage message) throws InvalidProtocolBufferException {
|
||||
DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class);
|
||||
return decoder.decode(msg);
|
||||
|
||||
@ -49,7 +49,7 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
|
||||
|
||||
private final Map<String, Publisher> publisherMap = new ConcurrentHashMap<>();
|
||||
|
||||
private ExecutorService pubExecutor = Executors.newCachedThreadPool();
|
||||
private final ExecutorService pubExecutor = Executors.newCachedThreadPool();
|
||||
|
||||
public TbPubSubProducerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String defaultTopic) {
|
||||
this.defaultTopic = defaultTopic;
|
||||
@ -124,8 +124,8 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
|
||||
publisherMap.put(topic, publisher);
|
||||
return publisher;
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to create topic [{}].", topic, e);
|
||||
throw new RuntimeException("Failed to create topic.", e);
|
||||
log.error("Failed to create Publisher for the topic [{}].", topic, e);
|
||||
throw new RuntimeException("Failed to create Publisher for the topic.", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -27,13 +27,11 @@ import java.util.concurrent.TimeoutException;
|
||||
@Slf4j
|
||||
public class TbRabbitMqAdmin implements TbQueueAdmin {
|
||||
|
||||
private final TbRabbitMqSettings rabbitMqSettings;
|
||||
private final Channel channel;
|
||||
private final Connection connection;
|
||||
private final Map<String, Object> arguments;
|
||||
|
||||
public TbRabbitMqAdmin(TbRabbitMqSettings rabbitMqSettings, Map<String, Object> arguments) {
|
||||
this.rabbitMqSettings = rabbitMqSettings;
|
||||
this.arguments = arguments;
|
||||
|
||||
try {
|
||||
|
||||
@ -23,9 +23,9 @@ import com.rabbitmq.client.GetResponse;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||
import org.thingsboard.server.queue.TbQueueMsg;
|
||||
import org.thingsboard.server.queue.TbQueueMsgDecoder;
|
||||
import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
|
||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -37,33 +37,26 @@ import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
|
||||
public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<GetResponse, T> {
|
||||
|
||||
private final Gson gson = new Gson();
|
||||
private final TbQueueAdmin admin;
|
||||
private final String topic;
|
||||
private final TbQueueMsgDecoder<T> decoder;
|
||||
private final TbRabbitMqSettings rabbitMqSettings;
|
||||
private final Channel channel;
|
||||
private final Connection connection;
|
||||
|
||||
private volatile Set<TopicPartitionInfo> partitions;
|
||||
private volatile boolean subscribed;
|
||||
private volatile Set<String> queues;
|
||||
private volatile boolean stopped;
|
||||
|
||||
public TbRabbitMqConsumerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder<T> decoder) {
|
||||
super(topic);
|
||||
this.admin = admin;
|
||||
this.decoder = decoder;
|
||||
this.topic = topic;
|
||||
this.rabbitMqSettings = rabbitMqSettings;
|
||||
try {
|
||||
connection = rabbitMqSettings.getConnectionFactory().newConnection();
|
||||
} catch (IOException | TimeoutException e) {
|
||||
log.error("Failed to create connection.", e);
|
||||
throw new RuntimeException("Failed to create connection.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
channel = connection.createChannel();
|
||||
} catch (IOException e) {
|
||||
@ -74,25 +67,42 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueue
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
protected List<GetResponse> doPoll(long durationInMillis) {
|
||||
List<GetResponse> result = queues.stream()
|
||||
.map(queue -> {
|
||||
try {
|
||||
return channel.basicGet(queue, false);
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to get messages from queue: [{}]", queue);
|
||||
throw new RuntimeException("Failed to get messages from queue.", e);
|
||||
}
|
||||
}).filter(Objects::nonNull).collect(Collectors.toList());
|
||||
if (result.size() > 0) {
|
||||
return result;
|
||||
} else {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe() {
|
||||
partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
|
||||
subscribed = false;
|
||||
protected void doSubscribe(List<String> topicNames) {
|
||||
queues = partitions.stream()
|
||||
.map(TopicPartitionInfo::getFullTopicName)
|
||||
.collect(Collectors.toSet());
|
||||
queues.forEach(admin::createTopicIfNotExists);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(Set<TopicPartitionInfo> partitions) {
|
||||
this.partitions = partitions;
|
||||
subscribed = false;
|
||||
protected void doCommit() {
|
||||
try {
|
||||
channel.basicAck(0, true);
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to ack messages.", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe() {
|
||||
stopped = true;
|
||||
protected void doUnsubscribe() {
|
||||
if (channel != null) {
|
||||
try {
|
||||
channel.close();
|
||||
@ -109,63 +119,6 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueue
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<T> poll(long durationInMillis) {
|
||||
if (!subscribed && partitions == null) {
|
||||
try {
|
||||
Thread.sleep(durationInMillis);
|
||||
} catch (InterruptedException e) {
|
||||
log.debug("Failed to await subscription", e);
|
||||
}
|
||||
} else {
|
||||
if (!subscribed) {
|
||||
queues = partitions.stream()
|
||||
.map(TopicPartitionInfo::getFullTopicName)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
queues.forEach(admin::createTopicIfNotExists);
|
||||
subscribed = true;
|
||||
}
|
||||
|
||||
List<T> result = queues.stream()
|
||||
.map(queue -> {
|
||||
try {
|
||||
return channel.basicGet(queue, false);
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to get messages from queue: [{}]", queue);
|
||||
throw new RuntimeException("Failed to get messages from queue.", e);
|
||||
}
|
||||
}).filter(Objects::nonNull).map(message -> {
|
||||
try {
|
||||
return decode(message);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
log.error("Failed to decode message: [{}].", message);
|
||||
throw new RuntimeException("Failed to decode message.", e);
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
if (result.size() > 0) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(durationInMillis);
|
||||
} catch (InterruptedException e) {
|
||||
if (!stopped) {
|
||||
log.error("Failed to wait.", e);
|
||||
}
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() {
|
||||
try {
|
||||
channel.basicAck(0, true);
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to ack messages.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public T decode(GetResponse message) throws InvalidProtocolBufferException {
|
||||
DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class);
|
||||
return decoder.decode(msg);
|
||||
|
||||
@ -30,6 +30,8 @@ import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@ -39,10 +41,12 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
|
||||
private final Gson gson = new Gson();
|
||||
private final TbQueueAdmin admin;
|
||||
private final TbRabbitMqSettings rabbitMqSettings;
|
||||
private ListeningExecutorService producerExecutor;
|
||||
private final ListeningExecutorService producerExecutor;
|
||||
private final Channel channel;
|
||||
private final Connection connection;
|
||||
|
||||
private final Set<TopicPartitionInfo> topics = ConcurrentHashMap.newKeySet();
|
||||
|
||||
public TbRabbitMqProducerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String defaultTopic) {
|
||||
this.admin = admin;
|
||||
this.defaultTopic = defaultTopic;
|
||||
@ -75,6 +79,7 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
|
||||
|
||||
@Override
|
||||
public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
|
||||
createTopicIfNotExist(tpi);
|
||||
AMQP.BasicProperties properties = new AMQP.BasicProperties();
|
||||
try {
|
||||
channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, gson.toJson(new DefaultTbQueueMsg(msg)).getBytes());
|
||||
@ -110,4 +115,11 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
|
||||
}
|
||||
}
|
||||
|
||||
private void createTopicIfNotExist(TopicPartitionInfo tpi) {
|
||||
if (topics.contains(tpi)) {
|
||||
return;
|
||||
}
|
||||
admin.createTopicIfNotExists(tpi.getFullTopicName());
|
||||
topics.add(tpi);
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,21 +25,17 @@ import com.amazonaws.services.sqs.model.Message;
|
||||
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||
import org.thingsboard.server.queue.TbQueueMsg;
|
||||
import org.thingsboard.server.queue.TbQueueMsgDecoder;
|
||||
import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
|
||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@ -47,34 +43,28 @@ import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@Slf4j
|
||||
public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
|
||||
public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> extends AbstractParallelTbQueueConsumerTemplate<Message, T> {
|
||||
|
||||
private static final int MAX_NUM_MSGS = 10;
|
||||
|
||||
private final Gson gson = new Gson();
|
||||
private final TbQueueAdmin admin;
|
||||
private final AmazonSQS sqsClient;
|
||||
private final String topic;
|
||||
private final TbQueueMsgDecoder<T> decoder;
|
||||
private final TbAwsSqsSettings sqsSettings;
|
||||
|
||||
private final List<AwsSqsMsgWrapper> pendingMessages = new CopyOnWriteArrayList<>();
|
||||
private volatile Set<String> queueUrls;
|
||||
private volatile Set<TopicPartitionInfo> partitions;
|
||||
private ListeningExecutorService consumerExecutor;
|
||||
private volatile boolean subscribed;
|
||||
private volatile boolean stopped = false;
|
||||
|
||||
public TbAwsSqsConsumerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String topic, TbQueueMsgDecoder<T> decoder) {
|
||||
super(topic);
|
||||
this.admin = admin;
|
||||
this.decoder = decoder;
|
||||
this.topic = topic;
|
||||
this.sqsSettings = sqsSettings;
|
||||
|
||||
AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
|
||||
@ -87,81 +77,60 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
protected void doSubscribe(List<String> topicNames) {
|
||||
queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet());
|
||||
initNewExecutor(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe() {
|
||||
partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
|
||||
subscribed = false;
|
||||
protected List<Message> doPoll(long durationInMillis) {
|
||||
int duration = (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis);
|
||||
List<ListenableFuture<List<Message>>> futureList = queueUrls
|
||||
.stream()
|
||||
.map(url -> poll(url, duration))
|
||||
.collect(Collectors.toList());
|
||||
ListenableFuture<List<List<Message>>> futureResult = Futures.allAsList(futureList);
|
||||
try {
|
||||
return futureResult.get().stream()
|
||||
.flatMap(List::stream)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
if (stopped) {
|
||||
log.info("[{}] Aws SQS consumer is stopped.", getTopic());
|
||||
} else {
|
||||
log.error("Failed to pool messages.", e);
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(Set<TopicPartitionInfo> partitions) {
|
||||
this.partitions = partitions;
|
||||
subscribed = false;
|
||||
public T decode(Message message) throws InvalidProtocolBufferException {
|
||||
DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class);
|
||||
return decoder.decode(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe() {
|
||||
protected void doCommit() {
|
||||
pendingMessages.forEach(msg ->
|
||||
consumerExecutor.submit(() -> {
|
||||
List<DeleteMessageBatchRequestEntry> entries = msg.getMessages()
|
||||
.stream()
|
||||
.map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle()))
|
||||
.collect(Collectors.toList());
|
||||
sqsClient.deleteMessageBatch(msg.getUrl(), entries);
|
||||
}));
|
||||
pendingMessages.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doUnsubscribe() {
|
||||
stopped = true;
|
||||
|
||||
if (sqsClient != null) {
|
||||
sqsClient.shutdown();
|
||||
}
|
||||
if (consumerExecutor != null) {
|
||||
consumerExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<T> poll(long durationInMillis) {
|
||||
if (!subscribed && partitions == null) {
|
||||
try {
|
||||
Thread.sleep(durationInMillis);
|
||||
} catch (InterruptedException e) {
|
||||
log.debug("Failed to await subscription", e);
|
||||
}
|
||||
} else {
|
||||
if (!subscribed) {
|
||||
List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
|
||||
queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet());
|
||||
consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1));
|
||||
subscribed = true;
|
||||
}
|
||||
|
||||
if (!pendingMessages.isEmpty()) {
|
||||
log.warn("Present {} non committed messages.", pendingMessages.size());
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<ListenableFuture<List<Message>>> futureList = queueUrls
|
||||
.stream()
|
||||
.map(url -> poll(url, (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis)))
|
||||
.collect(Collectors.toList());
|
||||
ListenableFuture<List<List<Message>>> futureResult = Futures.allAsList(futureList);
|
||||
try {
|
||||
return futureResult.get().stream()
|
||||
.flatMap(List::stream)
|
||||
.map(msg -> {
|
||||
try {
|
||||
return decode(msg);
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to decode message: [{}]", msg);
|
||||
return null;
|
||||
}
|
||||
}).filter(Objects::nonNull)
|
||||
.collect(Collectors.toList());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
if (stopped) {
|
||||
log.info("[{}] Aws SQS consumer is stopped.", topic);
|
||||
} else {
|
||||
log.error("Failed to pool messages.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return Collections.emptyList();
|
||||
shutdownExecutor();
|
||||
}
|
||||
|
||||
private ListenableFuture<List<Message>> poll(String url, int waitTimeSeconds) {
|
||||
@ -172,7 +141,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
|
||||
ReceiveMessageRequest request = new ReceiveMessageRequest();
|
||||
request
|
||||
.withWaitTimeSeconds(waitTimeSeconds)
|
||||
.withMessageAttributeNames("headers")
|
||||
.withQueueUrl(url)
|
||||
.withMaxNumberOfMessages(MAX_NUM_MSGS);
|
||||
return sqsClient.receiveMessage(request).getMessages();
|
||||
@ -194,25 +162,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
|
||||
}, consumerExecutor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() {
|
||||
pendingMessages.forEach(msg ->
|
||||
consumerExecutor.submit(() -> {
|
||||
List<DeleteMessageBatchRequestEntry> entries = msg.getMessages()
|
||||
.stream()
|
||||
.map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle()))
|
||||
.collect(Collectors.toList());
|
||||
sqsClient.deleteMessageBatch(msg.getUrl(), entries);
|
||||
}));
|
||||
|
||||
pendingMessages.clear();
|
||||
}
|
||||
|
||||
public T decode(Message message) throws InvalidProtocolBufferException {
|
||||
DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class);
|
||||
return decoder.decode(msg);
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class AwsSqsMsgWrapper {
|
||||
private final String url;
|
||||
|
||||
@ -37,6 +37,7 @@ import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@ -80,7 +81,9 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
|
||||
sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName()));
|
||||
sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg)));
|
||||
|
||||
sendMsgRequest.withMessageGroupId(msg.getKey().toString());
|
||||
sendMsgRequest.withMessageGroupId(tpi.getTopic());
|
||||
sendMsgRequest.withMessageDeduplicationId(UUID.randomUUID().toString());
|
||||
|
||||
ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest));
|
||||
|
||||
Futures.addCallback(future, new FutureCallback<SendMessageResult>() {
|
||||
|
||||
@ -55,7 +55,6 @@ public class TbAwsSqsQueueAttributes {
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
defaultAttributes.put(QueueAttributeName.FifoQueue.toString(), "true");
|
||||
defaultAttributes.put(QueueAttributeName.ContentBasedDeduplication.toString(), "true");
|
||||
|
||||
coreAttributes = getConfigs(coreProperties);
|
||||
ruleEngineAttributes = getConfigs(ruleEngineProperties);
|
||||
|
||||
@ -31,8 +31,8 @@ public class OAuth2ClientMapperConfig {
|
||||
private String firstNameAttributeKey;
|
||||
private String lastNameAttributeKey;
|
||||
private String tenantNameStrategy;
|
||||
private String tenantNameStrategyPattern;
|
||||
private String customerNameStrategyPattern;
|
||||
private String tenantNamePattern;
|
||||
private String customerNamePattern;
|
||||
}
|
||||
|
||||
@Data
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
|
||||
TB_QUEUE_TYPE=kafka
|
||||
REMOTE_JS_EVAL_REQUEST_TOPIC=js_eval.requests
|
||||
TB_KAFKA_SERVERS=kafka:9092
|
||||
LOGGER_LEVEL=info
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
service-type: "TB_SERVICE_TYPE" #kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
||||
queue_type: "TB_QUEUE_TYPE" #kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
|
||||
request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC"
|
||||
|
||||
js:
|
||||
@ -25,18 +25,18 @@ kafka:
|
||||
# Kafka Bootstrap Servers
|
||||
servers: "TB_KAFKA_SERVERS"
|
||||
replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR"
|
||||
topic-properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES"
|
||||
topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES"
|
||||
|
||||
pubsub:
|
||||
project_id: "TB_QUEUE_PUBSUB_PROJECT_ID"
|
||||
service_account: "TB_QUEUE_PUBSUB_SERVICE_ACCOUNT"
|
||||
queue-properties: "TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES"
|
||||
queue_properties: "TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES"
|
||||
|
||||
aws_sqs:
|
||||
access_key_id: "TB_QUEUE_AWS_SQS_ACCESS_KEY_ID"
|
||||
secret_access_key: "TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY"
|
||||
region: "TB_QUEUE_AWS_SQS_REGION"
|
||||
queue-properties: "TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES"
|
||||
queue_properties: "TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES"
|
||||
|
||||
rabbitmq:
|
||||
host: "TB_QUEUE_RABBIT_MQ_HOST"
|
||||
@ -44,14 +44,14 @@ rabbitmq:
|
||||
virtual_host: "TB_QUEUE_RABBIT_MQ_VIRTUAL_HOST"
|
||||
username: "TB_QUEUE_RABBIT_MQ_USERNAME"
|
||||
password: "TB_QUEUE_RABBIT_MQ_PASSWORD"
|
||||
queue-properties: "TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES"
|
||||
queue_properties: "TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES"
|
||||
|
||||
service_bus:
|
||||
namespace_name: "TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME"
|
||||
sas_key_name: "TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME"
|
||||
sas_key: "TB_QUEUE_SERVICE_BUS_SAS_KEY"
|
||||
max_messages: "TB_QUEUE_SERVICE_BUS_MAX_MESSAGES"
|
||||
queue-properties: "TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES"
|
||||
queue_properties: "TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES"
|
||||
|
||||
logger:
|
||||
level: "LOGGER_LEVEL"
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
service-type: "kafka"
|
||||
queue_type: "kafka"
|
||||
request_topic: "js_eval.requests"
|
||||
|
||||
js:
|
||||
@ -25,13 +25,13 @@ kafka:
|
||||
# Kafka Bootstrap Servers
|
||||
servers: "localhost:9092"
|
||||
replication_factor: "1"
|
||||
topic-properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600"
|
||||
topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600"
|
||||
|
||||
pubsub:
|
||||
queue-properties: "ackDeadlineInSec:30;messageRetentionInSec:604800"
|
||||
queue_properties: "ackDeadlineInSec:30;messageRetentionInSec:604800"
|
||||
|
||||
aws_sqs:
|
||||
queue-properties: "VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800"
|
||||
queue_properties: "VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800"
|
||||
|
||||
rabbitmq:
|
||||
host: "localhost"
|
||||
@ -39,10 +39,10 @@ rabbitmq:
|
||||
virtual_host: "/"
|
||||
username: "admin"
|
||||
password: "password"
|
||||
queue-properties: "x-max-length-bytes:1048576000;x-message-ttl:604800000"
|
||||
queue_properties: "x-max-length-bytes:1048576000;x-message-ttl:604800000"
|
||||
|
||||
service_bus:
|
||||
queue-properties: "lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800"
|
||||
queue_properties: "lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800"
|
||||
|
||||
logger:
|
||||
level: "info"
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
"azure-sb": "^0.11.1",
|
||||
"long": "^4.0.0",
|
||||
"uuid-parse": "^1.0.0",
|
||||
"uuid-random": "^1.3.0",
|
||||
"winston": "^3.0.0",
|
||||
"winston-daily-rotate-file": "^3.2.1"
|
||||
},
|
||||
|
||||
@ -19,6 +19,7 @@
|
||||
const config = require('config'),
|
||||
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
|
||||
logger = require('../config/logger')._logger('awsSqsTemplate');
|
||||
const uuid = require('uuid-random');
|
||||
|
||||
const requestTopic = config.get('request_topic');
|
||||
|
||||
@ -26,10 +27,10 @@ const accessKeyId = config.get('aws_sqs.access_key_id');
|
||||
const secretAccessKey = config.get('aws_sqs.secret_access_key');
|
||||
const region = config.get('aws_sqs.region');
|
||||
const AWS = require('aws-sdk');
|
||||
const queueProperties = config.get('aws_sqs.queue-properties');
|
||||
const poolInterval = config.get('js.response_poll_interval');
|
||||
const queueProperties = config.get('aws_sqs.queue_properties');
|
||||
const pollInterval = config.get('js.response_poll_interval');
|
||||
|
||||
let queueAttributes = {FifoQueue: 'true', ContentBasedDeduplication: 'true'};
|
||||
let queueAttributes = {FifoQueue: 'true'};
|
||||
let sqsClient;
|
||||
let requestQueueURL;
|
||||
const queueUrls = new Map();
|
||||
@ -51,7 +52,12 @@ function AwsSqsProducer() {
|
||||
queueUrls.set(responseTopic, responseQueueUrl);
|
||||
}
|
||||
|
||||
let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: scriptId};
|
||||
let params = {
|
||||
MessageBody: msgBody,
|
||||
QueueUrl: responseQueueUrl,
|
||||
MessageGroupId: 'js_eval',
|
||||
MessageDeduplicationId: uuid()
|
||||
};
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
sqsClient.sendMessage(params, function (err, data) {
|
||||
@ -74,11 +80,13 @@ function AwsSqsProducer() {
|
||||
|
||||
const queues = await getQueues();
|
||||
|
||||
queues.forEach(queueUrl => {
|
||||
const delimiterPosition = queueUrl.lastIndexOf('/');
|
||||
const queueName = queueUrl.substring(delimiterPosition + 1);
|
||||
queueUrls.set(queueName, queueUrl);
|
||||
})
|
||||
if (queues) {
|
||||
queues.forEach(queueUrl => {
|
||||
const delimiterPosition = queueUrl.lastIndexOf('/');
|
||||
const queueName = queueUrl.substring(delimiterPosition + 1);
|
||||
queueUrls.set(queueName, queueUrl);
|
||||
});
|
||||
}
|
||||
|
||||
parseQueueProperties();
|
||||
|
||||
@ -95,6 +103,7 @@ function AwsSqsProducer() {
|
||||
WaitTimeSeconds: poolInterval / 1000
|
||||
};
|
||||
while (!stopped) {
|
||||
let pollStartTs = new Date().getTime();
|
||||
const messages = await new Promise((resolve, reject) => {
|
||||
sqsClient.receiveMessage(params, function (err, data) {
|
||||
if (err) {
|
||||
@ -127,6 +136,11 @@ function AwsSqsProducer() {
|
||||
//do nothing
|
||||
}
|
||||
});
|
||||
} else {
|
||||
let pollDuration = new Date().getTime() - pollStartTs;
|
||||
if (pollDuration < pollInterval) {
|
||||
await sleep(pollInterval - pollDuration);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
@ -175,6 +189,12 @@ function parseQueueProperties() {
|
||||
});
|
||||
}
|
||||
|
||||
function sleep(ms) {
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(resolve, ms);
|
||||
});
|
||||
}
|
||||
|
||||
process.on('exit', () => {
|
||||
stopped = true;
|
||||
logger.info('Aws Sqs client stopped.');
|
||||
|
||||
@ -20,7 +20,7 @@ const config = require('config'),
|
||||
logger = require('../config/logger')._logger('kafkaTemplate'),
|
||||
KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator;
|
||||
const replicationFactor = config.get('kafka.replication_factor');
|
||||
const topicProperties = config.get('kafka.topic-properties');
|
||||
const topicProperties = config.get('kafka.topic_properties');
|
||||
|
||||
let kafkaClient;
|
||||
let kafkaAdmin;
|
||||
|
||||
@ -24,7 +24,7 @@ const {PubSub} = require('@google-cloud/pubsub');
|
||||
const projectId = config.get('pubsub.project_id');
|
||||
const credentials = JSON.parse(config.get('pubsub.service_account'));
|
||||
const requestTopic = config.get('request_topic');
|
||||
const queueProperties = config.get('pubsub.queue-properties');
|
||||
const queueProperties = config.get('pubsub.queue_properties');
|
||||
|
||||
let pubSubClient;
|
||||
|
||||
@ -98,23 +98,32 @@ function PubSubProducer() {
|
||||
|
||||
async function createTopic(topic) {
|
||||
if (!topics.includes(topic)) {
|
||||
await pubSubClient.createTopic(topic);
|
||||
try {
|
||||
await pubSubClient.createTopic(topic);
|
||||
logger.info('Created new Pub/Sub topic: %s', topic);
|
||||
} catch (e) {
|
||||
logger.info('Pub/Sub topic already exists');
|
||||
}
|
||||
topics.push(topic);
|
||||
logger.info('Created new Pub/Sub topic: %s', topic);
|
||||
}
|
||||
await createSubscription(topic)
|
||||
}
|
||||
|
||||
async function createSubscription(topic) {
|
||||
if (!subscriptions.includes(topic)) {
|
||||
await pubSubClient.createSubscription(topic, topic, {
|
||||
topic: topic,
|
||||
subscription: topic,
|
||||
ackDeadlineSeconds: queueProps['ackDeadlineInSec'],
|
||||
messageRetentionDuration: {seconds: queueProps['messageRetentionInSec']}
|
||||
});
|
||||
try {
|
||||
await pubSubClient.createSubscription(topic, topic, {
|
||||
topic: topic,
|
||||
subscription: topic,
|
||||
ackDeadlineSeconds: queueProps['ackDeadlineInSec'],
|
||||
messageRetentionDuration: {seconds: queueProps['messageRetentionInSec']}
|
||||
});
|
||||
logger.info('Created new Pub/Sub subscription: %s', topic);
|
||||
} catch (e) {
|
||||
logger.info('Pub/Sub subscription already exists.');
|
||||
}
|
||||
|
||||
subscriptions.push(topic);
|
||||
logger.info('Created new Pub/Sub subscription: %s', topic);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -26,23 +26,23 @@ const port = config.get('rabbitmq.port');
|
||||
const vhost = config.get('rabbitmq.virtual_host');
|
||||
const username = config.get('rabbitmq.username');
|
||||
const password = config.get('rabbitmq.password');
|
||||
const queueProperties = config.get('rabbitmq.queue-properties');
|
||||
const poolInterval = config.get('js.response_poll_interval');
|
||||
const queueProperties = config.get('rabbitmq.queue_properties');
|
||||
const pollInterval = config.get('js.response_poll_interval');
|
||||
|
||||
const amqp = require('amqplib/callback_api');
|
||||
|
||||
let queueParams = {durable: false, exclusive: false, autoDelete: false};
|
||||
let queueOptions = {durable: false, exclusive: false, autoDelete: false};
|
||||
let connection;
|
||||
let channel;
|
||||
let stopped = false;
|
||||
const responseTopics = [];
|
||||
let queues = [];
|
||||
|
||||
function RabbitMqProducer() {
|
||||
this.send = async (responseTopic, scriptId, rawResponse, headers) => {
|
||||
|
||||
if (!responseTopics.includes(responseTopic)) {
|
||||
if (!queues.includes(responseTopic)) {
|
||||
await createQueue(responseTopic);
|
||||
responseTopics.push(responseTopic);
|
||||
queues.push(responseTopic);
|
||||
}
|
||||
|
||||
let data = JSON.stringify(
|
||||
@ -98,6 +98,7 @@ function RabbitMqProducer() {
|
||||
const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer());
|
||||
|
||||
while (!stopped) {
|
||||
let pollStartTs = new Date().getTime();
|
||||
let message = await new Promise((resolve, reject) => {
|
||||
channel.get(requestTopic, {}, function (err, msg) {
|
||||
if (err) {
|
||||
@ -112,7 +113,10 @@ function RabbitMqProducer() {
|
||||
messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8')));
|
||||
channel.ack(message);
|
||||
} else {
|
||||
await sleep(poolInterval);
|
||||
let pollDuration = new Date().getTime() - pollStartTs;
|
||||
if (pollDuration < pollInterval) {
|
||||
await sleep(pollInterval - pollDuration);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
@ -123,16 +127,18 @@ function RabbitMqProducer() {
|
||||
})();
|
||||
|
||||
function parseQueueProperties() {
|
||||
let args = {};
|
||||
const props = queueProperties.split(';');
|
||||
props.forEach(p => {
|
||||
const delimiterPosition = p.indexOf(':');
|
||||
queueParams[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
|
||||
args[p.substring(0, delimiterPosition)] = +p.substring(delimiterPosition + 1);
|
||||
});
|
||||
queueOptions['arguments'] = args;
|
||||
}
|
||||
|
||||
function createQueue(topic) {
|
||||
async function createQueue(topic) {
|
||||
return new Promise((resolve, reject) => {
|
||||
channel.assertQueue(topic, queueParams, function (err) {
|
||||
channel.assertQueue(topic, queueOptions, function (err) {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
|
||||
@ -26,7 +26,7 @@ const requestTopic = config.get('request_topic');
|
||||
const namespaceName = config.get('service_bus.namespace_name');
|
||||
const sasKeyName = config.get('service_bus.sas_key_name');
|
||||
const sasKey = config.get('service_bus.sas_key');
|
||||
const queueProperties = config.get('service_bus.queue-properties');
|
||||
const queueProperties = config.get('service_bus.queue_properties');
|
||||
|
||||
let sbClient;
|
||||
let receiverClient;
|
||||
@ -140,6 +140,7 @@ function parseQueueProperties() {
|
||||
properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
|
||||
});
|
||||
queueOptions = {
|
||||
DuplicateDetection: 'false',
|
||||
MaxSizeInMegabytes: properties['maxSizeInMb'],
|
||||
DefaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`,
|
||||
LockDuration: `PT${properties['lockDurationInSec']}S`
|
||||
|
||||
@ -16,7 +16,7 @@
|
||||
|
||||
const config = require('config'), logger = require('./config/logger')._logger('main');
|
||||
|
||||
const serviceType = config.get('service-type');
|
||||
const serviceType = config.get('queue_type');
|
||||
switch (serviceType) {
|
||||
case 'kafka':
|
||||
logger.info('Starting kafka template.');
|
||||
|
||||
2
pom.xml
2
pom.xml
@ -96,7 +96,7 @@
|
||||
<snakeyaml.version>1.25</snakeyaml.version>
|
||||
<struts.version>1.3.10</struts.version>
|
||||
<amazonaws.sqs.version>1.11.747</amazonaws.sqs.version>
|
||||
<pubsub.client.version>1.84.0</pubsub.client.version>
|
||||
<pubsub.client.version>1.105.0</pubsub.client.version>
|
||||
<azure-servicebus.version>3.2.0</azure-servicebus.version>
|
||||
<passay.version>1.5.0</passay.version>
|
||||
<ua-parser.version>1.4.3</ua-parser.version>
|
||||
|
||||
@ -81,8 +81,8 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
|
||||
}
|
||||
alarm.setStatus(alarm.getStatus().isAck() ? AlarmStatus.CLEARED_ACK : AlarmStatus.CLEARED_UNACK);
|
||||
return Futures.immediateFuture(new AlarmResult(false, false, true, alarm));
|
||||
}, MoreExecutors.directExecutor());
|
||||
}, MoreExecutors.directExecutor());
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
}
|
||||
|
||||
@ -1149,7 +1149,7 @@
|
||||
"total": "Gesamt"
|
||||
},
|
||||
"login": {
|
||||
"login": "Login",
|
||||
"login": "Anmelden",
|
||||
"request-password-reset": "Passwortzurücksetzung anfordern",
|
||||
"reset-password": "Passwort zurücksetzen",
|
||||
"create-password": "Passwort erstellen",
|
||||
@ -1163,7 +1163,9 @@
|
||||
"new-password": "Neues Passwort",
|
||||
"new-password-again": "Neues Passwort wiederholen",
|
||||
"password-link-sent-message": "Der Link zum Zurücksetzen des Passworts wurde erfolgreich versendet!",
|
||||
"email": "E-Mail"
|
||||
"email": "E-Mail",
|
||||
"login-with": "Mit {{name}} anmelden",
|
||||
"or": "oder"
|
||||
},
|
||||
"position": {
|
||||
"top": "Oben",
|
||||
|
||||
@ -1705,7 +1705,9 @@
|
||||
"password-link-sent-message": "Ο σύνδεσμος επαναφοράς κωδικού πρόσβασης στάλθηκε με επιτυχία!",
|
||||
"email": "Email",
|
||||
"no-account": "Δεν έχετε λογαριασμό;",
|
||||
"create-account": "Δημιουργία λογαριασμού"
|
||||
"create-account": "Δημιουργία λογαριασμού",
|
||||
"login-with": "Σύνδεση μέσω {{name}}",
|
||||
"or": "ή"
|
||||
},
|
||||
"signup": {
|
||||
"firstname": "Όνομα",
|
||||
|
||||
@ -1229,7 +1229,9 @@
|
||||
"new-password": "Nueva contraseña",
|
||||
"new-password-again": "Repita la nueva contraseña",
|
||||
"password-link-sent-message": "¡El enlace para el restablecer la contraseña fue enviado correctamente!",
|
||||
"email": "Correo electrónico"
|
||||
"email": "Correo electrónico",
|
||||
"login-with": "Iniciar sesión con {{name}}",
|
||||
"or": "o"
|
||||
},
|
||||
"position": {
|
||||
"top": "Superior",
|
||||
|
||||
@ -1197,7 +1197,7 @@
|
||||
"create-password": "Créer un mot de passe",
|
||||
"email": "Email",
|
||||
"forgot-password": "Mot de passe oublié?",
|
||||
"login": "Login",
|
||||
"login": "Connexion",
|
||||
"new-password": "Nouveau mot de passe",
|
||||
"new-password-again": "nouveau mot de passe",
|
||||
"password-again": "Mot de passe à nouveau",
|
||||
@ -1208,7 +1208,9 @@
|
||||
"request-password-reset": "Demander la réinitialisation du mot de passe",
|
||||
"reset-password": "Réinitialiser le mot de passe",
|
||||
"sign-in": "Veuillez vous connecter",
|
||||
"username": "Nom d'utilisateur (courriel)"
|
||||
"username": "Nom d'utilisateur (courriel)",
|
||||
"login-with": "Se connecter avec {{name}}",
|
||||
"or": "ou"
|
||||
},
|
||||
"position": {
|
||||
"bottom": "Bas",
|
||||
|
||||
@ -1160,7 +1160,7 @@
|
||||
"total": "totale"
|
||||
},
|
||||
"login": {
|
||||
"login": "Login",
|
||||
"login": "Accedi",
|
||||
"request-password-reset": "Richiesta reset password",
|
||||
"reset-password": "Reset Password",
|
||||
"create-password": "Crea Password",
|
||||
@ -1174,7 +1174,9 @@
|
||||
"new-password": "Nuova password",
|
||||
"new-password-again": "Ripeti nuova password",
|
||||
"password-link-sent-message": "Link reset password inviato con successo!",
|
||||
"email": "Email"
|
||||
"email": "Email",
|
||||
"login-with": "Accedi con {{name}}",
|
||||
"or": "o"
|
||||
},
|
||||
"position": {
|
||||
"top": "Alto",
|
||||
|
||||
@ -1024,7 +1024,9 @@
|
||||
"new-password": "新しいパスワード",
|
||||
"new-password-again": "新しいパスワードを再入力",
|
||||
"password-link-sent-message": "パスワードリセットリンクが正常に送信されました!",
|
||||
"email": "Eメール"
|
||||
"email": "Eメール",
|
||||
"login-with": "{{name}}でログイン",
|
||||
"or": "または"
|
||||
},
|
||||
"position": {
|
||||
"top": "上",
|
||||
|
||||
@ -934,7 +934,9 @@
|
||||
"new-password": "새 비밀번호",
|
||||
"new-password-again": "새 비밀번호 확인",
|
||||
"password-link-sent-message": "비밀번호 재설정 링크가 성공적으로 전송되었습니다!",
|
||||
"email": "이메일"
|
||||
"email": "이메일",
|
||||
"login-with": "{{name}}으로 로그인",
|
||||
"or": "또는"
|
||||
},
|
||||
"position": {
|
||||
"top": "상단",
|
||||
|
||||
@ -1230,7 +1230,7 @@
|
||||
}
|
||||
},
|
||||
"login": {
|
||||
"login": "Intră în Cont",
|
||||
"login": "Conectare",
|
||||
"request-password-reset": "Solicită Resetarea Parolei",
|
||||
"reset-password": "Resetează Parolă",
|
||||
"create-password": "Creează Parolă",
|
||||
@ -1245,7 +1245,9 @@
|
||||
"new-password": "Parolă nouă",
|
||||
"new-password-again": "Verificare parolă nouă",
|
||||
"password-link-sent-message": "Ți-am trimis pe eMail un link pentru resetarea parolei",
|
||||
"email": "eMail"
|
||||
"email": "eMail",
|
||||
"login-with": "Conectare cu {{name}}",
|
||||
"or": "sau"
|
||||
},
|
||||
"position": {
|
||||
"top": "Sus",
|
||||
|
||||
@ -1090,7 +1090,7 @@
|
||||
"total": "toplam"
|
||||
},
|
||||
"login": {
|
||||
"login": "Oturum aç",
|
||||
"login": "Giriş Yap",
|
||||
"request-password-reset": "Parola Sıfırlama İsteği Gönder",
|
||||
"reset-password": "Parola Sıfırla",
|
||||
"create-password": "Parola Oluştur",
|
||||
@ -1104,7 +1104,9 @@
|
||||
"new-password": "Yeni parola",
|
||||
"new-password-again": "Yeni parola tekrarı",
|
||||
"password-link-sent-message": "Parola sıfırlama e-postası başarıyla gönderildi!",
|
||||
"email": "E-posta"
|
||||
"email": "E-posta",
|
||||
"login-with": "{{name}} ile Giriş Yap",
|
||||
"or": "ya da"
|
||||
},
|
||||
"position": {
|
||||
"top": "Üst",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user