Resend only failed messages and not the entire pack

This commit is contained in:
Volodymyr Babak 2021-06-01 16:16:51 +03:00
parent d79ab8880d
commit 7fa6cbd53f
18 changed files with 75 additions and 7 deletions

View File

@ -27,6 +27,7 @@ import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.Nullable;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -83,7 +84,9 @@ import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetsBundlesEdgeEve
import java.io.Closeable; import java.io.Closeable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@ -108,6 +111,8 @@ public final class EdgeGrpcSession implements Closeable {
private final Consumer<EdgeId> sessionCloseListener; private final Consumer<EdgeId> sessionCloseListener;
private final ObjectMapper mapper; private final ObjectMapper mapper;
private final Map<Integer, DownlinkMsg> pendingMsgsMap;
private EdgeContextComponent ctx; private EdgeContextComponent ctx;
private Edge edge; private Edge edge;
private StreamObserver<RequestMsg> inputStream; private StreamObserver<RequestMsg> inputStream;
@ -128,6 +133,7 @@ public final class EdgeGrpcSession implements Closeable {
this.sessionCloseListener = sessionCloseListener; this.sessionCloseListener = sessionCloseListener;
this.mapper = mapper; this.mapper = mapper;
this.syncExecutorService = syncExecutorService; this.syncExecutorService = syncExecutorService;
this.pendingMsgsMap = new HashMap<>();
initInputStream(); initInputStream();
} }
@ -214,6 +220,7 @@ public final class EdgeGrpcSession implements Closeable {
startProcessingEdgeEvents(new DashboardsEdgeEventFetcher(ctx.getDashboardService())); startProcessingEdgeEvents(new DashboardsEdgeEventFetcher(ctx.getDashboardService()));
DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder() DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build()) .setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build())
.build(); .build();
sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)); sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg));
@ -228,7 +235,9 @@ public final class EdgeGrpcSession implements Closeable {
Futures.addCallback(future, new FutureCallback<>() { Futures.addCallback(future, new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable List<Void> result) { public void onSuccess(@Nullable List<Void> result) {
UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder().setSuccess(true).build(); UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder()
.setUplinkMsgId(uplinkMsg.getUplinkMsgId())
.setSuccess(true).build();
sendDownlinkMsg(ResponseMsg.newBuilder() sendDownlinkMsg(ResponseMsg.newBuilder()
.setUplinkResponseMsg(uplinkResponseMsg) .setUplinkResponseMsg(uplinkResponseMsg)
.build()); .build());
@ -236,7 +245,9 @@ public final class EdgeGrpcSession implements Closeable {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(t.getMessage()).build(); UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder()
.setUplinkMsgId(uplinkMsg.getUplinkMsgId())
.setSuccess(false).setErrorMsg(t.getMessage()).build();
sendDownlinkMsg(ResponseMsg.newBuilder() sendDownlinkMsg(ResponseMsg.newBuilder()
.setUplinkResponseMsg(uplinkResponseMsg) .setUplinkResponseMsg(uplinkResponseMsg)
.build()); .build());
@ -247,6 +258,7 @@ public final class EdgeGrpcSession implements Closeable {
private void onDownlinkResponse(DownlinkResponseMsg msg) { private void onDownlinkResponse(DownlinkResponseMsg msg) {
try { try {
if (msg.getSuccess()) { if (msg.getSuccess()) {
pendingMsgsMap.remove(msg.getDownlinkMsgId());
log.debug("[{}] Msg has been processed successfully! {}", edge.getRoutingKey(), msg); log.debug("[{}] Msg has been processed successfully! {}", edge.getRoutingKey(), msg);
} else { } else {
log.error("[{}] Msg processing failed! Error msg: {}", edge.getRoutingKey(), msg.getErrorMsg()); log.error("[{}] Msg processing failed! Error msg: {}", edge.getRoutingKey(), msg.getErrorMsg());
@ -325,17 +337,19 @@ public final class EdgeGrpcSession implements Closeable {
private boolean sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) throws InterruptedException { private boolean sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) throws InterruptedException {
boolean success; boolean success;
pendingMsgsMap.clear();
downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg));
do { do {
log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, downlinkMsgsPack.size()); log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, pendingMsgsMap.values().size());
latch = new CountDownLatch(downlinkMsgsPack.size()); latch = new CountDownLatch(pendingMsgsMap.values().size());
for (DownlinkMsg downlinkMsg : downlinkMsgsPack) { for (DownlinkMsg downlinkMsg : pendingMsgsMap.values()) {
sendDownlinkMsg(ResponseMsg.newBuilder() sendDownlinkMsg(ResponseMsg.newBuilder()
.setDownlinkMsg(downlinkMsg) .setDownlinkMsg(downlinkMsg)
.build()); .build());
} }
success = latch.await(10, TimeUnit.SECONDS); success = latch.await(10, TimeUnit.SECONDS);
if (!success) { if (!success) {
log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, downlinkMsgsPack); log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, pendingMsgsMap.values());
} }
if (isConnected() && !success) { if (isConnected() && !success) {
try { try {
@ -383,6 +397,8 @@ public final class EdgeGrpcSession implements Closeable {
case RPC_CALL: case RPC_CALL:
downlinkMsg = ctx.getDeviceProcessor().processRpcCallMsgToEdge(edgeEvent); downlinkMsg = ctx.getDeviceProcessor().processRpcCallMsgToEdge(edgeEvent);
break; break;
default:
log.warn("[{}][{}] Unsupported action type [{}]", edge.getTenantId(), this.sessionId, edgeEvent.getAction());
} }
} catch (Exception e) { } catch (Exception e) {
log.error("[{}][{}] Exception during converting edge event to downlink msg", edge.getTenantId(), this.sessionId, e); log.error("[{}][{}] Exception during converting edge event to downlink msg", edge.getTenantId(), this.sessionId, e);

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.AdminSettings; import org.thingsboard.server.common.data.AdminSettings;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.gen.edge.AdminSettingsUpdateMsg; import org.thingsboard.server.gen.edge.AdminSettingsUpdateMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg; import org.thingsboard.server.gen.edge.DownlinkMsg;
@ -34,6 +35,7 @@ public class AdminSettingsEdgeProcessor extends BaseEdgeProcessor {
AdminSettings adminSettings = mapper.convertValue(edgeEvent.getBody(), AdminSettings.class); AdminSettings adminSettings = mapper.convertValue(edgeEvent.getBody(), AdminSettings.class);
AdminSettingsUpdateMsg t = adminSettingsMsgConstructor.constructAdminSettingsUpdateMsg(adminSettings); AdminSettingsUpdateMsg t = adminSettingsMsgConstructor.constructAdminSettingsUpdateMsg(adminSettings);
return DownlinkMsg.newBuilder() return DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllAdminSettingsUpdateMsg(Collections.singletonList(t)) .addAllAdminSettingsUpdateMsg(Collections.singletonList(t))
.build(); .build();
} }

View File

@ -121,6 +121,7 @@ public class AlarmEdgeProcessor extends BaseEdgeProcessor {
Alarm alarm = alarmService.findAlarmByIdAsync(edgeEvent.getTenantId(), alarmId).get(); Alarm alarm = alarmService.findAlarmByIdAsync(edgeEvent.getTenantId(), alarmId).get();
if (alarm != null) { if (alarm != null) {
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllAlarmUpdateMsg(Collections.singletonList(alarmMsgConstructor.constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm))) .addAllAlarmUpdateMsg(Collections.singletonList(alarmMsgConstructor.constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm)))
.build(); .build();
} }

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
@ -50,6 +51,7 @@ public class AssetEdgeProcessor extends BaseEdgeProcessor {
AssetUpdateMsg assetUpdateMsg = AssetUpdateMsg assetUpdateMsg =
assetMsgConstructor.constructAssetUpdatedMsg(msgType, asset, customerId); assetMsgConstructor.constructAssetUpdatedMsg(msgType, asset, customerId);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg)) .addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg))
.build(); .build();
} }
@ -59,6 +61,7 @@ public class AssetEdgeProcessor extends BaseEdgeProcessor {
AssetUpdateMsg assetUpdateMsg = AssetUpdateMsg assetUpdateMsg =
assetMsgConstructor.constructAssetDeleteMsg(assetId); assetMsgConstructor.constructAssetDeleteMsg(assetId);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg)) .addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg))
.build(); .build();
break; break;

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -53,6 +54,7 @@ public class CustomerEdgeProcessor extends BaseEdgeProcessor {
CustomerUpdateMsg customerUpdateMsg = CustomerUpdateMsg customerUpdateMsg =
customerMsgConstructor.constructCustomerUpdatedMsg(msgType, customer); customerMsgConstructor.constructCustomerUpdatedMsg(msgType, customer);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg)) .addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg))
.build(); .build();
} }
@ -61,6 +63,7 @@ public class CustomerEdgeProcessor extends BaseEdgeProcessor {
CustomerUpdateMsg customerUpdateMsg = CustomerUpdateMsg customerUpdateMsg =
customerMsgConstructor.constructCustomerDeleteMsg(customerId); customerMsgConstructor.constructCustomerDeleteMsg(customerId);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg)) .addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg))
.build(); .build();
break; break;

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -53,6 +54,7 @@ public class DashboardEdgeProcessor extends BaseEdgeProcessor {
DashboardUpdateMsg dashboardUpdateMsg = DashboardUpdateMsg dashboardUpdateMsg =
dashboardMsgConstructor.constructDashboardUpdatedMsg(msgType, dashboard, customerId); dashboardMsgConstructor.constructDashboardUpdatedMsg(msgType, dashboard, customerId);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg)) .addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg))
.build(); .build();
} }
@ -62,6 +64,7 @@ public class DashboardEdgeProcessor extends BaseEdgeProcessor {
DashboardUpdateMsg dashboardUpdateMsg = DashboardUpdateMsg dashboardUpdateMsg =
dashboardMsgConstructor.constructDashboardDeleteMsg(dashboardId); dashboardMsgConstructor.constructDashboardDeleteMsg(dashboardId);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg)) .addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg))
.build(); .build();
break; break;

View File

@ -24,10 +24,12 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -48,7 +50,6 @@ import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg;
import org.thingsboard.server.gen.edge.DeviceRpcCallMsg; import org.thingsboard.server.gen.edge.DeviceRpcCallMsg;
import org.thingsboard.server.gen.edge.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
@ -308,6 +309,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
DeviceUpdateMsg deviceUpdateMsg = DeviceUpdateMsg deviceUpdateMsg =
deviceMsgConstructor.constructDeviceUpdatedMsg(msgType, device, customerId, null); deviceMsgConstructor.constructDeviceUpdatedMsg(msgType, device, customerId, null);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg)) .addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg))
.build(); .build();
} }
@ -317,6 +319,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
DeviceUpdateMsg deviceUpdateMsg = DeviceUpdateMsg deviceUpdateMsg =
deviceMsgConstructor.constructDeviceDeleteMsg(deviceId); deviceMsgConstructor.constructDeviceDeleteMsg(deviceId);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg)) .addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg))
.build(); .build();
break; break;
@ -326,6 +329,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg =
deviceMsgConstructor.constructDeviceCredentialsUpdatedMsg(deviceCredentials); deviceMsgConstructor.constructDeviceCredentialsUpdatedMsg(deviceCredentials);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllDeviceCredentialsUpdateMsg(Collections.singletonList(deviceCredentialsUpdateMsg)) .addAllDeviceCredentialsUpdateMsg(Collections.singletonList(deviceCredentialsUpdateMsg))
.build(); .build();
} }
@ -339,6 +343,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
DeviceRpcCallMsg deviceRpcCallMsg = DeviceRpcCallMsg deviceRpcCallMsg =
deviceMsgConstructor.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody()); deviceMsgConstructor.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody());
return DownlinkMsg.newBuilder() return DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllDeviceRpcCallMsg(Collections.singletonList(deviceRpcCallMsg)) .addAllDeviceRpcCallMsg(Collections.singletonList(deviceRpcCallMsg))
.build(); .build();
} }

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.DeviceProfileId;
@ -44,6 +45,7 @@ public class DeviceProfileEdgeProcessor extends BaseEdgeProcessor {
DeviceProfileUpdateMsg deviceProfileUpdateMsg = DeviceProfileUpdateMsg deviceProfileUpdateMsg =
deviceProfileMsgConstructor.constructDeviceProfileUpdatedMsg(msgType, deviceProfile); deviceProfileMsgConstructor.constructDeviceProfileUpdatedMsg(msgType, deviceProfile);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllDeviceProfileUpdateMsg(Collections.singletonList(deviceProfileUpdateMsg)) .addAllDeviceProfileUpdateMsg(Collections.singletonList(deviceProfileUpdateMsg))
.build(); .build();
} }
@ -52,6 +54,7 @@ public class DeviceProfileEdgeProcessor extends BaseEdgeProcessor {
DeviceProfileUpdateMsg deviceProfileUpdateMsg = DeviceProfileUpdateMsg deviceProfileUpdateMsg =
deviceProfileMsgConstructor.constructDeviceProfileDeleteMsg(deviceProfileId); deviceProfileMsgConstructor.constructDeviceProfileDeleteMsg(deviceProfileId);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllDeviceProfileUpdateMsg(Collections.singletonList(deviceProfileUpdateMsg)) .addAllDeviceProfileUpdateMsg(Collections.singletonList(deviceProfileUpdateMsg))
.build(); .build();
break; break;

View File

@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -66,6 +67,7 @@ public class EntityEdgeProcessor extends BaseEdgeProcessor {
DeviceUpdateMsg d = deviceMsgConstructor DeviceUpdateMsg d = deviceMsgConstructor
.constructDeviceUpdatedMsg(UpdateMsgType.ENTITY_MERGE_RPC_MESSAGE, device, customerId, conflictName); .constructDeviceUpdatedMsg(UpdateMsgType.ENTITY_MERGE_RPC_MESSAGE, device, customerId, conflictName);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllDeviceUpdateMsg(Collections.singletonList(d)) .addAllDeviceUpdateMsg(Collections.singletonList(d))
.build(); .build();
} }
@ -81,6 +83,7 @@ public class EntityEdgeProcessor extends BaseEdgeProcessor {
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.build(); .build();
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder() DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllDeviceCredentialsRequestMsg(Collections.singletonList(deviceCredentialsRequestMsg)); .addAllDeviceCredentialsRequestMsg(Collections.singletonList(deviceCredentialsRequestMsg));
downlinkMsg = builder.build(); downlinkMsg = builder.build();
} }

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
@ -50,6 +51,7 @@ public class EntityViewEdgeProcessor extends BaseEdgeProcessor {
EntityViewUpdateMsg entityViewUpdateMsg = EntityViewUpdateMsg entityViewUpdateMsg =
entityViewMsgConstructor.constructEntityViewUpdatedMsg(msgType, entityView, customerId); entityViewMsgConstructor.constructEntityViewUpdatedMsg(msgType, entityView, customerId);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg)) .addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg))
.build(); .build();
} }
@ -59,6 +61,7 @@ public class EntityViewEdgeProcessor extends BaseEdgeProcessor {
EntityViewUpdateMsg entityViewUpdateMsg = EntityViewUpdateMsg entityViewUpdateMsg =
entityViewMsgConstructor.constructEntityViewDeleteMsg(entityViewId); entityViewMsgConstructor.constructEntityViewDeleteMsg(entityViewId);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg)) .addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg))
.build(); .build();
break; break;

View File

@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -119,6 +120,7 @@ public class RelationEdgeProcessor extends BaseEdgeProcessor {
EntityRelation entityRelation = mapper.convertValue(edgeEvent.getBody(), EntityRelation.class); EntityRelation entityRelation = mapper.convertValue(edgeEvent.getBody(), EntityRelation.class);
RelationUpdateMsg r = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation); RelationUpdateMsg r = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation);
return DownlinkMsg.newBuilder() return DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllRelationUpdateMsg(Collections.singletonList(r)) .addAllRelationUpdateMsg(Collections.singletonList(r))
.build(); .build();
} }

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -48,6 +49,7 @@ public class RuleChainEdgeProcessor extends BaseEdgeProcessor {
RuleChainUpdateMsg ruleChainUpdateMsg = RuleChainUpdateMsg ruleChainUpdateMsg =
ruleChainMsgConstructor.constructRuleChainUpdatedMsg(edge.getRootRuleChainId(), msgType, ruleChain); ruleChainMsgConstructor.constructRuleChainUpdatedMsg(edge.getRootRuleChainId(), msgType, ruleChain);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllRuleChainUpdateMsg(Collections.singletonList(ruleChainUpdateMsg)) .addAllRuleChainUpdateMsg(Collections.singletonList(ruleChainUpdateMsg))
.build(); .build();
} }
@ -55,6 +57,7 @@ public class RuleChainEdgeProcessor extends BaseEdgeProcessor {
case DELETED: case DELETED:
case UNASSIGNED_FROM_EDGE: case UNASSIGNED_FROM_EDGE:
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllRuleChainUpdateMsg(Collections.singletonList(ruleChainMsgConstructor.constructRuleChainDeleteMsg(ruleChainId))) .addAllRuleChainUpdateMsg(Collections.singletonList(ruleChainMsgConstructor.constructRuleChainDeleteMsg(ruleChainId)))
.build(); .build();
break; break;
@ -72,6 +75,7 @@ public class RuleChainEdgeProcessor extends BaseEdgeProcessor {
ruleChainMsgConstructor.constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData); ruleChainMsgConstructor.constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData);
if (ruleChainMetadataUpdateMsg != null) { if (ruleChainMetadataUpdateMsg != null) {
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllRuleChainMetadataUpdateMsg(Collections.singletonList(ruleChainMetadataUpdateMsg)) .addAllRuleChainMetadataUpdateMsg(Collections.singletonList(ruleChainMetadataUpdateMsg))
.build(); .build();
} }

View File

@ -31,6 +31,7 @@ import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.Asset;
@ -321,6 +322,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, EdgeEventActionType actionType, JsonElement entityData) { private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, EdgeEventActionType actionType, JsonElement entityData) {
EntityDataProto entityDataProto = entityDataMsgConstructor.constructEntityDataMsg(entityId, actionType, entityData); EntityDataProto entityDataProto = entityDataMsgConstructor.constructEntityDataMsg(entityId, actionType, entityData);
return DownlinkMsg.newBuilder() return DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllEntityData(Collections.singletonList(entityDataProto)) .addAllEntityData(Collections.singletonList(entityDataProto))
.build(); .build();
} }

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
@ -46,12 +47,14 @@ public class UserEdgeProcessor extends BaseEdgeProcessor {
if (user != null) { if (user != null) {
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(user, edge); CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(user, edge);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllUserUpdateMsg(Collections.singletonList(userMsgConstructor.constructUserUpdatedMsg(msgType, user, customerId))) .addAllUserUpdateMsg(Collections.singletonList(userMsgConstructor.constructUserUpdatedMsg(msgType, user, customerId)))
.build(); .build();
} }
break; break;
case DELETED: case DELETED:
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllUserUpdateMsg(Collections.singletonList(userMsgConstructor.constructUserDeleteMsg(userId))) .addAllUserUpdateMsg(Collections.singletonList(userMsgConstructor.constructUserDeleteMsg(userId)))
.build(); .build();
break; break;
@ -61,6 +64,7 @@ public class UserEdgeProcessor extends BaseEdgeProcessor {
UserCredentialsUpdateMsg userCredentialsUpdateMsg = UserCredentialsUpdateMsg userCredentialsUpdateMsg =
userMsgConstructor.constructUserCredentialsUpdatedMsg(userCredentialsByUserId); userMsgConstructor.constructUserCredentialsUpdatedMsg(userCredentialsByUserId);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllUserCredentialsUpdateMsg(Collections.singletonList(userCredentialsUpdateMsg)) .addAllUserCredentialsUpdateMsg(Collections.singletonList(userCredentialsUpdateMsg))
.build(); .build();
} }

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.WidgetsBundleId; import org.thingsboard.server.common.data.id.WidgetsBundleId;
@ -44,6 +45,7 @@ public class WidgetBundleEdgeProcessor extends BaseEdgeProcessor {
WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = WidgetsBundleUpdateMsg widgetsBundleUpdateMsg =
widgetsBundleMsgConstructor.constructWidgetsBundleUpdateMsg(msgType, widgetsBundle); widgetsBundleMsgConstructor.constructWidgetsBundleUpdateMsg(msgType, widgetsBundle);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg)) .addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg))
.build(); .build();
} }
@ -52,6 +54,7 @@ public class WidgetBundleEdgeProcessor extends BaseEdgeProcessor {
WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = WidgetsBundleUpdateMsg widgetsBundleUpdateMsg =
widgetsBundleMsgConstructor.constructWidgetsBundleDeleteMsg(widgetsBundleId); widgetsBundleMsgConstructor.constructWidgetsBundleDeleteMsg(widgetsBundleId);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg)) .addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg))
.build(); .build();
break; break;

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.WidgetTypeId; import org.thingsboard.server.common.data.id.WidgetTypeId;
@ -44,6 +45,7 @@ public class WidgetTypeEdgeProcessor extends BaseEdgeProcessor {
WidgetTypeUpdateMsg widgetTypeUpdateMsg = WidgetTypeUpdateMsg widgetTypeUpdateMsg =
widgetTypeMsgConstructor.constructWidgetTypeUpdateMsg(msgType, widgetType); widgetTypeMsgConstructor.constructWidgetTypeUpdateMsg(msgType, widgetType);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg)) .addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg))
.build(); .build();
} }
@ -52,6 +54,7 @@ public class WidgetTypeEdgeProcessor extends BaseEdgeProcessor {
WidgetTypeUpdateMsg widgetTypeUpdateMsg = WidgetTypeUpdateMsg widgetTypeUpdateMsg =
widgetTypeMsgConstructor.constructWidgetTypeDeleteMsg(widgetTypeId); widgetTypeMsgConstructor.constructWidgetTypeDeleteMsg(widgetTypeId);
downlinkMsg = DownlinkMsg.newBuilder() downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg)) .addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg))
.build(); .build();
break; break;

View File

@ -18,6 +18,8 @@ package org.thingsboard.server.common.data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.edge.EdgeEventType;
import java.util.concurrent.ThreadLocalRandom;
@Slf4j @Slf4j
public final class EdgeUtils { public final class EdgeUtils {
@ -57,4 +59,8 @@ public final class EdgeUtils {
return null; return null;
} }
} }
public static int nextPositiveInt() {
return ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE);
}
} }

View File

@ -434,11 +434,13 @@ message UplinkMsg {
message UplinkResponseMsg { message UplinkResponseMsg {
bool success = 1; bool success = 1;
string errorMsg = 2; string errorMsg = 2;
int32 uplinkMsgId = 3;
} }
message DownlinkResponseMsg { message DownlinkResponseMsg {
bool success = 1; bool success = 1;
string errorMsg = 2; string errorMsg = 2;
int32 downlinkMsgId = 3;
} }
message DownlinkMsg { message DownlinkMsg {