This commit is contained in:
Viacheslav Klimov 2021-03-25 10:06:06 +02:00
parent afab5150b8
commit 306ace040c
13 changed files with 257 additions and 167 deletions

View File

@ -647,6 +647,11 @@ transport:
redis.enabled: "${LWM2M_REDIS_ENABLED:false}" redis.enabled: "${LWM2M_REDIS_ENABLED:false}"
snmp: snmp:
enabled: "${SNMP_ENABLED:true}" enabled: "${SNMP_ENABLED:true}"
response_processing:
# parallelism level for executor (workStealingPool) that is responsible for handling responses from SNMP devices
parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:20}"
# to configure SNMP to work over UDP or TCP
underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}"
swagger: swagger:
api_path_regex: "${SWAGGER_API_PATH_REGEX:/api.*}" api_path_regex: "${SWAGGER_API_PATH_REGEX:/api.*}"

View File

@ -1,39 +0,0 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.device;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.HashMap;
import java.util.Map;
public abstract class JsonBasedTransportConfiguration {
@JsonIgnore
private final Map<String, Object> properties = new HashMap<>();
@JsonAnyGetter
public Map<String, Object> properties() {
return this.properties;
}
@JsonAnySetter
public void put(String name, Object value) {
this.properties.put(name, value);
}
}

View File

@ -58,5 +58,11 @@
<groupId>org.snmp4j</groupId> <groupId>org.snmp4j</groupId>
<artifactId>snmp4j</artifactId> <artifactId>snmp4j</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.snmp4j</groupId>
<artifactId>snmp4j-agent</artifactId>
<version>3.3.6</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,81 +0,0 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.snmp;
import org.snmp4j.CommunityTarget;
import org.snmp4j.PDU;
import org.snmp4j.Snmp;
import org.snmp4j.Target;
import org.snmp4j.event.ResponseEvent;
import org.snmp4j.mp.SnmpConstants;
import org.snmp4j.smi.GenericAddress;
import org.snmp4j.smi.OID;
import org.snmp4j.smi.OctetString;
import org.snmp4j.smi.VariableBinding;
import org.snmp4j.transport.DefaultUdpTransportMapping;
import org.snmp4j.transport.UdpTransportMapping;
import java.io.IOException;
/**
* For testing purposes. Will be removed when the time comes
*/
public class SnmpDeviceSimulator {
private final Target target;
private final OID oid = new OID(".1.3.6.1.2.1.1.1.0");
private Snmp snmp;
public SnmpDeviceSimulator(int port) {
String address = "udp:127.0.0.1/" + port;
CommunityTarget target = new CommunityTarget();
target.setCommunity(new OctetString("public"));
target.setAddress(GenericAddress.parse(address));
target.setRetries(2);
target.setTimeout(1500);
target.setVersion(SnmpConstants.version2c);
this.target = target;
}
public static void main(String[] args) throws IOException {
SnmpDeviceSimulator deviceSimulator = new SnmpDeviceSimulator(161);
deviceSimulator.start();
String response = deviceSimulator.sendRequest(PDU.GET);
System.out.println(response);
}
public void start() throws IOException {
UdpTransportMapping transport = new DefaultUdpTransportMapping();
transport.addTransportListener((sourceTransport, incomingAddress, wholeMessage, tmStateReference) -> {
System.out.println();
});
snmp = new Snmp(transport);
transport.listen();
}
public String sendRequest(int pduType) throws IOException {
PDU pdu = new PDU();
pdu.add(new VariableBinding(oid));
pdu.setType(pduType);
ResponseEvent responseEvent = snmp.send(pdu, target);
return responseEvent.getResponse().get(0).getVariable().toString();
}
}

View File

@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.snmp4j.PDU; import org.snmp4j.PDU;
import org.snmp4j.smi.OID; import org.snmp4j.smi.OID;
import org.snmp4j.smi.VariableBinding; import org.snmp4j.smi.VariableBinding;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
@ -79,6 +80,9 @@ public class SnmpTransportContext extends TransportContext {
private final Map<DeviceProfileId, List<PDU>> profilesPdus = new ConcurrentHashMap<>(); private final Map<DeviceProfileId, List<PDU>> profilesPdus = new ConcurrentHashMap<>();
private Collection<DeviceId> allSnmpDevicesIds = new ConcurrentLinkedDeque<>(); private Collection<DeviceId> allSnmpDevicesIds = new ConcurrentLinkedDeque<>();
@Value("${transport.snmp.underlying_protocol}")
private String snmpUnderlyingProtocol;
@AfterStartUp(order = 2) @AfterStartUp(order = 2)
public void initDevicesSessions() { public void initDevicesSessions() {
log.info("Initializing SNMP devices sessions"); log.info("Initializing SNMP devices sessions");
@ -124,9 +128,9 @@ public class SnmpTransportContext extends TransportContext {
profilesPdus.computeIfAbsent(deviceProfileId, id -> createPdus(profileTransportConfiguration)); profilesPdus.computeIfAbsent(deviceProfileId, id -> createPdus(profileTransportConfiguration));
DeviceSessionContext deviceSessionContext = new DeviceSessionContext( DeviceSessionContext deviceSessionContext = new DeviceSessionContext(
device, deviceProfile, device, deviceProfile, credentials.getCredentialsId(),
credentials.getCredentialsId(), deviceTransportConfiguration, deviceTransportConfiguration, this,
this, snmpTransportService snmpTransportService, snmpUnderlyingProtocol
); );
registerSessionMsgListener(deviceSessionContext); registerSessionMsgListener(deviceSessionContext);
sessions.put(device.getId(), deviceSessionContext); sessions.put(device.getId(), deviceSessionContext);

View File

@ -19,10 +19,13 @@ import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.snmp4j.PDU; import org.snmp4j.PDU;
import org.snmp4j.Snmp; import org.snmp4j.Snmp;
import org.snmp4j.TransportMapping;
import org.snmp4j.event.ResponseEvent; import org.snmp4j.event.ResponseEvent;
import org.snmp4j.smi.Null; import org.snmp4j.smi.Null;
import org.snmp4j.smi.VariableBinding; import org.snmp4j.smi.VariableBinding;
import org.snmp4j.transport.DefaultTcpTransportMapping;
import org.snmp4j.transport.DefaultUdpTransportMapping; import org.snmp4j.transport.DefaultUdpTransportMapping;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
@ -57,6 +60,11 @@ public class SnmpTransportService implements TbTransportService {
private ScheduledExecutorService pollingExecutor; private ScheduledExecutorService pollingExecutor;
private ExecutorService snmpResponseProcessingExecutor; private ExecutorService snmpResponseProcessingExecutor;
@Value("${transport.snmp.response_processing.parallelism_level}")
private Integer responseProcessingParallelismLevel;
@Value("${transport.snmp.underlying_protocol}")
private String snmpUnderlyingProtocol;
public SnmpTransportService(@Lazy SnmpTransportContext snmpTransportContext, public SnmpTransportService(@Lazy SnmpTransportContext snmpTransportContext,
TransportService transportService) { TransportService transportService) {
this.snmpTransportContext = snmpTransportContext; this.snmpTransportContext = snmpTransportContext;
@ -68,8 +76,7 @@ public class SnmpTransportService implements TbTransportService {
log.info("Initializing SNMP transport service"); log.info("Initializing SNMP transport service");
pollingExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("snmp-polling")); pollingExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("snmp-polling"));
//TODO: Set parallelism value in the config snmpResponseProcessingExecutor = Executors.newWorkStealingPool(responseProcessingParallelismLevel);
snmpResponseProcessingExecutor = Executors.newWorkStealingPool(20);
initializeSnmp(); initializeSnmp();
@ -77,19 +84,28 @@ public class SnmpTransportService implements TbTransportService {
} }
private void initializeSnmp() throws IOException { private void initializeSnmp() throws IOException {
snmp = new Snmp(new DefaultUdpTransportMapping()); TransportMapping<?> transportMapping;
switch (snmpUnderlyingProtocol) {
case "udp":
transportMapping = new DefaultUdpTransportMapping();
break;
case "tcp":
transportMapping = new DefaultTcpTransportMapping();
break;
default:
throw new IllegalArgumentException("Underlying protocol " + snmpUnderlyingProtocol + " for SNMP is not supported");
}
snmp = new Snmp(transportMapping);
snmp.listen(); snmp.listen();
} }
@AfterStartUp(order = 10) @AfterStartUp(order = 10)
public void startPolling() { public void startPolling() {
log.info("Starting SNMP polling"); log.info("Starting SNMP polling");
//TODO: Get poll period from configuration;
int pollPeriodSeconds = 1;
pollingExecutor.scheduleWithFixedDelay(() -> { pollingExecutor.scheduleWithFixedDelay(() -> {
snmpTransportContext.getSessions().forEach(this::executeSnmpRequest); snmpTransportContext.getSessions().forEach(this::executeSnmpRequest);
}, 0, pollPeriodSeconds, TimeUnit.SECONDS); }, 0, 1, TimeUnit.SECONDS);
} }
private void executeSnmpRequest(DeviceSessionContext sessionContext) { private void executeSnmpRequest(DeviceSessionContext sessionContext) {
@ -111,12 +127,12 @@ public class SnmpTransportService implements TbTransportService {
} }
} }
public void onNewDeviceResponse(ResponseEvent responseEvent, DeviceSessionContext sessionContext) { public void onNewDeviceResponse(DeviceSessionContext sessionContext, ResponseEvent responseEvent) {
((Snmp) responseEvent.getSource()).cancel(responseEvent.getRequest(), sessionContext); ((Snmp) responseEvent.getSource()).cancel(responseEvent.getRequest(), sessionContext);
snmpResponseProcessingExecutor.submit(() -> processSnmpResponse(responseEvent, sessionContext)); snmpResponseProcessingExecutor.submit(() -> processSnmpResponse(sessionContext, responseEvent));
} }
private void processSnmpResponse(ResponseEvent event, DeviceSessionContext sessionContext) { private void processSnmpResponse(DeviceSessionContext sessionContext, ResponseEvent event) {
if (event.getError() != null) { if (event.getError() != null) {
log.warn("Response error: {}", event.getError().getMessage(), event.getError()); log.warn("Response error: {}", event.getError().getMessage(), event.getError());
return; return;

View File

@ -65,10 +65,12 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
private long previousRequestExecutedAt = 0; private long previousRequestExecutedAt = 0;
private final AtomicInteger msgIdSeq = new AtomicInteger(0); private final AtomicInteger msgIdSeq = new AtomicInteger(0);
private boolean isActive = true; private boolean isActive = true;
private final String snmpUnderlyingProtocol;
public DeviceSessionContext(Device device, DeviceProfile deviceProfile, public DeviceSessionContext(Device device, DeviceProfile deviceProfile, String token,
String token, SnmpDeviceTransportConfiguration deviceTransportConfiguration, SnmpDeviceTransportConfiguration deviceTransportConfiguration,
SnmpTransportContext snmpTransportContext, SnmpTransportService snmpTransportService) { SnmpTransportContext snmpTransportContext, SnmpTransportService snmpTransportService,
String snmpUnderlyingProtocol) {
super(UUID.randomUUID()); super(UUID.randomUUID());
super.setDeviceId(device.getId()); super.setDeviceId(device.getId());
super.setDeviceProfile(deviceProfile); super.setDeviceProfile(deviceProfile);
@ -81,6 +83,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
this.profileTransportConfiguration = (SnmpDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration(); this.profileTransportConfiguration = (SnmpDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration();
this.deviceTransportConfiguration = deviceTransportConfiguration; this.deviceTransportConfiguration = deviceTransportConfiguration;
this.snmpUnderlyingProtocol = snmpUnderlyingProtocol;
initTarget(this.profileTransportConfiguration, this.deviceTransportConfiguration); initTarget(this.profileTransportConfiguration, this.deviceTransportConfiguration);
} }
@ -100,14 +103,14 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
@Override @Override
public void onResponse(ResponseEvent event) { public void onResponse(ResponseEvent event) {
if (isActive) { if (isActive) {
snmpTransportService.onNewDeviceResponse(event, this); snmpTransportService.onNewDeviceResponse(this, event);
} }
} }
public void initTarget(SnmpDeviceProfileTransportConfiguration profileTransportConfig, SnmpDeviceTransportConfiguration deviceTransportConfig) { public void initTarget(SnmpDeviceProfileTransportConfiguration profileTransportConfig, SnmpDeviceTransportConfiguration deviceTransportConfig) {
log.trace("Initializing target for SNMP session of device {}", device); log.trace("Initializing target for SNMP session of device {}", device);
CommunityTarget communityTarget = new CommunityTarget(); CommunityTarget communityTarget = new CommunityTarget();
communityTarget.setAddress(GenericAddress.parse(GenericAddress.TYPE_UDP + ":" + deviceTransportConfig.getAddress() + "/" + deviceTransportConfig.getPort())); communityTarget.setAddress(GenericAddress.parse(snmpUnderlyingProtocol + ":" + deviceTransportConfig.getAddress() + "/" + deviceTransportConfig.getPort()));
communityTarget.setVersion(deviceTransportConfig.getProtocolVersion().getCode()); communityTarget.setVersion(deviceTransportConfig.getProtocolVersion().getCode());
communityTarget.setCommunity(new OctetString(deviceTransportConfig.getCommunity())); communityTarget.setCommunity(new OctetString(deviceTransportConfig.getCommunity()));
communityTarget.setTimeout(profileTransportConfig.getTimeoutMs()); communityTarget.setTimeout(profileTransportConfig.getTimeoutMs());

View File

@ -0,0 +1,168 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.snmp;
import org.snmp4j.CommunityTarget;
import org.snmp4j.Target;
import org.snmp4j.TransportMapping;
import org.snmp4j.agent.BaseAgent;
import org.snmp4j.agent.CommandProcessor;
import org.snmp4j.agent.DuplicateRegistrationException;
import org.snmp4j.agent.MOGroup;
import org.snmp4j.agent.ManagedObject;
import org.snmp4j.agent.mo.MOAccessImpl;
import org.snmp4j.agent.mo.MOScalar;
import org.snmp4j.agent.mo.snmp.RowStatus;
import org.snmp4j.agent.mo.snmp.SnmpCommunityMIB;
import org.snmp4j.agent.mo.snmp.SnmpNotificationMIB;
import org.snmp4j.agent.mo.snmp.SnmpTargetMIB;
import org.snmp4j.agent.mo.snmp.StorageType;
import org.snmp4j.agent.mo.snmp.VacmMIB;
import org.snmp4j.agent.security.MutableVACM;
import org.snmp4j.mp.MPv3;
import org.snmp4j.mp.SnmpConstants;
import org.snmp4j.security.SecurityLevel;
import org.snmp4j.security.SecurityModel;
import org.snmp4j.security.USM;
import org.snmp4j.smi.Address;
import org.snmp4j.smi.GenericAddress;
import org.snmp4j.smi.Integer32;
import org.snmp4j.smi.OID;
import org.snmp4j.smi.OctetString;
import org.snmp4j.smi.Variable;
import org.snmp4j.transport.TransportMappings;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Scanner;
public class SnmpDeviceSimulator extends BaseAgent {
public static void main(String[] args) throws IOException {
SnmpDeviceSimulator device = new SnmpDeviceSimulator(1610);
device.start();
device.setUpMappings(Map.of(
".1.3.6.1.2.1.1.1.50", "12",
".1.3.6.1.2.1.2.1.52", "56",
".1.3.6.1.2.1.3.1.54", "yes"
));
new Scanner(System.in).nextLine();
}
private final Target target;
private final Address address;
public SnmpDeviceSimulator(int port) {
super(new File("conf.agent"), new File("bootCounter.agent"),
new CommandProcessor(new OctetString(MPv3.createLocalEngineID())));
CommunityTarget target = new CommunityTarget();
target.setCommunity(new OctetString("public"));
address = GenericAddress.parse("udp:0.0.0.0/" + port);
target.setAddress(address);
target.setRetries(2);
target.setTimeout(1500);
target.setVersion(SnmpConstants.version2c);
this.target = target;
}
public void start() throws IOException {
init();
addShutdownHook();
getServer().addContext(new OctetString("public"));
finishInit();
run();
sendColdStartNotification();
}
public void setUpMappings(Map<String, String> oidToResponseMappings) {
unregisterManagedObject(getSnmpv2MIB());
oidToResponseMappings.forEach((oid, response) -> {
registerManagedObject(new MOScalar<>(new OID(oid), MOAccessImpl.ACCESS_READ_ONLY, new OctetString(response)));
});
}
public Target getTarget() {
return target;
}
@Override
protected void registerManagedObjects() {
}
protected void registerManagedObject(ManagedObject mo) {
try {
server.register(mo, null);
} catch (DuplicateRegistrationException ex) {
throw new RuntimeException(ex);
}
}
protected void unregisterManagedObject(MOGroup moGroup) {
moGroup.unregisterMOs(server, getContext(moGroup));
}
@Override
protected void addNotificationTargets(SnmpTargetMIB targetMIB,
SnmpNotificationMIB notificationMIB) {
}
@Override
protected void addViews(VacmMIB vacm) {
vacm.addGroup(SecurityModel.SECURITY_MODEL_SNMPv2c, new OctetString(
"cpublic"), new OctetString("v1v2group"),
StorageType.nonVolatile);
vacm.addAccess(new OctetString("v1v2group"), new OctetString("public"),
SecurityModel.SECURITY_MODEL_ANY, SecurityLevel.NOAUTH_NOPRIV,
MutableVACM.VACM_MATCH_EXACT, new OctetString("fullReadView"),
new OctetString("fullWriteView"), new OctetString(
"fullNotifyView"), StorageType.nonVolatile);
vacm.addViewTreeFamily(new OctetString("fullReadView"), new OID("1.3"),
new OctetString(), VacmMIB.vacmViewIncluded,
StorageType.nonVolatile);
}
protected void addUsmUser(USM usm) {
}
protected void initTransportMappings() {
transportMappings = new TransportMapping[]{TransportMappings.getInstance().createTransportMapping(address)};
}
protected void unregisterManagedObjects() {
}
protected void addCommunities(SnmpCommunityMIB communityMIB) {
Variable[] com2sec = new Variable[]{
new OctetString("public"),
new OctetString("cpublic"),
getAgent().getContextEngineID(),
new OctetString("public"),
new OctetString(),
new Integer32(StorageType.nonVolatile),
new Integer32(RowStatus.active)
};
SnmpCommunityMIB.SnmpCommunityEntryRow row = communityMIB.getSnmpCommunityEntry().createRow(
new OctetString("public2public").toSubIndex(true), com2sec);
communityMIB.getSnmpCommunityEntry().addRow(row);
}
}

View File

@ -0,0 +1,27 @@
{
"pollPeriodMs": 3000,
"timeoutMs": 500,
"retries": 0,
"telemetryMappings": [
{
"oid": ".1.3.6.1.2.1.1.1.50",
"method": "GET",
"key": "temperature",
"dataType": "LONG"
},
{
"oid": ".1.3.6.1.2.1.2.1.52",
"method": "GET",
"key": "humidity",
"dataType": "LONG"
}
],
"attributesMappings": [
{
"oid": ".1.3.6.1.2.1.3.1.54",
"method": "GET",
"key": "isCool",
"dataType": "STRING"
}
]
}

View File

@ -0,0 +1,6 @@
{
"address": "127.0.0.1",
"port": 1610,
"community": "public",
"protocolVersion": "V2C"
}

View File

@ -1,9 +0,0 @@
{
"deviceName": "Thermostat T1",
"snmpConfig": {
"address": "192.168.1.2",
"port": 161,
"community": "U5J=$HWj6f@7",
"protocolVersion": "v2c"
}
}

View File

@ -1,21 +0,0 @@
{
"poolPeriodMs": 10000,
"timeoutMs": 5000,
"retries": 5,
"attributes": [
{
"key": "snmpNodeManagerEmail",
"type": "STRING",
"method": "get",
"oid": ".1.3.6.1.2.1.1.4.0"
}
],
"telemetry": [
{
"key": "snmpNodeSysUpTime",
"type": "LONG",
"method": "get",
"oid": ".1.3.6.1.2.1.1.3.0"
}
]
}

View File

@ -189,6 +189,11 @@ transport:
# Local LwM2M transport parameters # Local LwM2M transport parameters
snmp: snmp:
enabled: "${SNMP_ENABLED:true}" enabled: "${SNMP_ENABLED:true}"
response_processing:
# parallelism level for executor (workStealingPool) that is responsible for handling responses from SNMP devices
parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:20}"
# to configure SNMP to work over UDP or TCP
underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}"
queue: queue:
type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)