diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 4af723f3a0..9b2e3022a9 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -390,7 +390,8 @@ public class DefaultTransportService implements TransportService { metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceType", sessionInfo.getDeviceType()); metaData.putValue("notifyDevice", "false"); - sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, new TransportTbQueueCallback(callback)); + sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, + new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, msg.getKvList().size(), callback))); } } @@ -399,7 +400,7 @@ public class DefaultTransportService implements TransportService { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setGetAttributes(msg).build(), callback); + .setGetAttributes(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback)); } } @@ -409,7 +410,7 @@ public class DefaultTransportService implements TransportService { SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscribeToAttributes(msg).build(), callback); + .setSubscribeToAttributes(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback)); } } @@ -419,7 +420,7 @@ public class DefaultTransportService implements TransportService { SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscribeToRPC(msg).build(), callback); + .setSubscribeToRPC(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback)); } } @@ -428,7 +429,7 @@ public class DefaultTransportService implements TransportService { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setToDeviceRPCCallResponse(msg).build(), callback); + .setToDeviceRPCCallResponse(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback)); } } @@ -805,7 +806,7 @@ public class DefaultTransportService implements TransportService { @Override public void onFailure(Throwable t) { - callback.onError(t); + DefaultTransportService.this.transportCallbackExecutor.submit(() -> callback.onError(t)); } }