Merge remote-tracking branch 'upstream/master' into feature/edge-multi-customers

This commit is contained in:
Volodymyr Babak 2022-10-12 17:34:32 +03:00
commit cc4029120a
39 changed files with 870 additions and 268 deletions

View File

@ -0,0 +1,74 @@
--
-- Copyright © 2016-2022 The Thingsboard Authors
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
DO
$$
DECLARE table_partition RECORD;
BEGIN
-- in case of running the upgrade script a second time:
IF NOT (SELECT exists(SELECT FROM pg_tables WHERE tablename = 'old_audit_log')) THEN
ALTER TABLE audit_log RENAME TO old_audit_log;
ALTER INDEX IF EXISTS idx_audit_log_tenant_id_and_created_time RENAME TO idx_old_audit_log_tenant_id_and_created_time;
FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts
FROM pg_tables WHERE tablename LIKE 'audit_log_%'
LOOP
EXECUTE format('ALTER TABLE %s RENAME TO old_audit_log_%s', table_partition.name, table_partition.partition_ts);
END LOOP;
ELSE
RAISE NOTICE 'Table old_audit_log already exists, leaving as is';
END IF;
END;
$$;
CREATE TABLE IF NOT EXISTS audit_log (
id uuid NOT NULL,
created_time bigint NOT NULL,
tenant_id uuid,
customer_id uuid,
entity_id uuid,
entity_type varchar(255),
entity_name varchar(255),
user_id uuid,
user_name varchar(255),
action_type varchar(255),
action_data varchar(1000000),
action_status varchar(255),
action_failure_details varchar(1000000)
) PARTITION BY RANGE (created_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC);
CREATE OR REPLACE PROCEDURE migrate_audit_logs(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT)
LANGUAGE plpgsql AS
$$
DECLARE
p RECORD;
partition_end_ts BIGINT;
BEGIN
FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_audit_log
WHERE created_time >= start_time_ms AND created_time < end_time_ms
LOOP
partition_end_ts = p.partition_ts + partition_size_ms;
RAISE NOTICE '[audit_log] Partition to create : [%-%]', p.partition_ts, partition_end_ts;
EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF audit_log ' ||
'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts);
END LOOP;
INSERT INTO audit_log
SELECT * FROM old_audit_log
WHERE created_time >= start_time_ms AND created_time < end_time_ms;
END;
$$;

View File

@ -36,7 +36,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
@ -105,7 +105,9 @@ public class AppActor extends ContextAwareActor {
onToDeviceActorMsg((TenantAwareMsg) msg, true);
break;
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
onToTenantActorMsg((EdgeEventUpdateMsg) msg);
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
onToEdgeSessionMsg((EdgeSessionMsg) msg);
break;
case SESSION_TIMEOUT_MSG:
ctx.broadcastToChildrenByType(msg, EntityType.TENANT);
@ -193,7 +195,7 @@ public class AppActor extends ContextAwareActor {
() -> new TenantActor.ActorCreator(systemContext, tenantId));
}
private void onToTenantActorMsg(EdgeEventUpdateMsg msg) {
private void onToEdgeSessionMsg(EdgeSessionMsg msg) {
TbActorRef target = null;
if (ModelConstants.SYSTEM_TENANT.equals(msg.getTenantId())) {
log.warn("Message has system tenant id: {}", msg);
@ -203,7 +205,7 @@ public class AppActor extends ContextAwareActor {
if (target != null) {
target.tellWithHighPriority(msg);
} else {
log.debug("[{}] Invalid edge event update msg: {}", msg.getTenantId(), msg);
log.debug("[{}] Invalid edge session msg: {}", msg.getTenantId(), msg);
}
}

View File

@ -47,7 +47,7 @@ import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
@ -167,7 +167,9 @@ public class TenantActor extends RuleChainManagerActor {
onRuleChainMsg((RuleChainAwareMsg) msg);
break;
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
onToEdgeSessionMsg((EdgeEventUpdateMsg) msg);
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
onToEdgeSessionMsg((EdgeSessionMsg) msg);
break;
default:
return false;
@ -269,9 +271,8 @@ public class TenantActor extends RuleChainManagerActor {
() -> new DeviceActorCreator(systemContext, tenantId, deviceId));
}
private void onToEdgeSessionMsg(EdgeEventUpdateMsg msg) {
log.trace("[{}] onToEdgeSessionMsg [{}]", msg.getTenantId(), msg);
systemContext.getEdgeRpcService().onEdgeEvent(tenantId, msg.getEdgeId());
private void onToEdgeSessionMsg(EdgeSessionMsg msg) {
systemContext.getEdgeRpcService().onToEdgeSessionMsg(tenantId, msg);
}
private ApiUsageState getApiUsageState() {

View File

@ -279,10 +279,7 @@ public abstract class BaseController {
protected EdgeService edgeService;
@Autowired(required = false)
protected EdgeNotificationService edgeNotificationService;
@Autowired(required = false)
protected EdgeRpcService edgeGrpcService;
protected EdgeRpcService edgeRpcService;
@Autowired
protected TbNotificationEntityService notificationEntityService;

View File

@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
@ -32,6 +33,7 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.rule.engine.flow.TbRuleChainInputNode;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.EntitySubtype;
@ -47,6 +49,8 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
@ -61,6 +65,7 @@ import org.thingsboard.server.service.security.permission.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.thingsboard.server.controller.ControllerConstants.CUSTOMER_ID_PARAM_DESCRIPTION;
@ -529,24 +534,35 @@ public class EdgeController extends BaseController {
"All entities that are assigned to particular edge are going to be send to remote edge service." + TENANT_AUTHORITY_PARAGRAPH)
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/edge/sync/{edgeId}", method = RequestMethod.POST)
public void syncEdge(@ApiParam(value = EDGE_ID_PARAM_DESCRIPTION, required = true)
public DeferredResult<ResponseEntity> syncEdge(@ApiParam(value = EDGE_ID_PARAM_DESCRIPTION, required = true)
@PathVariable("edgeId") String strEdgeId) throws ThingsboardException {
checkParameter("edgeId", strEdgeId);
try {
final DeferredResult<ResponseEntity> response = new DeferredResult<>();
if (isEdgesEnabled()) {
EdgeId edgeId = new EdgeId(toUUID(strEdgeId));
edgeId = checkNotNull(edgeId);
SecurityUser user = getCurrentUser();
TenantId tenantId = user.getTenantId();
edgeGrpcService.startSyncProcess(tenantId, edgeId);
ToEdgeSyncRequest request = new ToEdgeSyncRequest(UUID.randomUUID(), tenantId, edgeId);
edgeRpcService.processSyncRequest(request, fromEdgeSyncResponse -> reply(response, fromEdgeSyncResponse));
} else {
throw new ThingsboardException("Edges support disabled", ThingsboardErrorCode.GENERAL);
}
return response;
} catch (Exception e) {
throw handleException(e);
}
}
private void reply(DeferredResult<ResponseEntity> response, FromEdgeSyncResponse fromEdgeSyncResponse) {
if (fromEdgeSyncResponse.isSuccess()) {
response.setResult(new ResponseEntity<>(HttpStatus.OK));
} else {
response.setErrorResult(new ThingsboardException("Edge is not connected", ThingsboardErrorCode.GENERAL));
}
}
@ApiOperation(value = "Find missing rule chains (findMissingToRelatedRuleChains)",
notes = "Returns list of rule chains ids that are not assigned to particular edge, but these rule chains are present in the already assigned rule chains to edge." + TENANT_AUTHORITY_PARAGRAPH)
@PreAuthorize("hasAuthority('TENANT_ADMIN')")

View File

@ -228,6 +228,7 @@ public class ThingsboardInstallService {
case "3.4.1":
log.info("Upgrading ThingsBoard from version 3.4.1 to 3.4.2 ...");
databaseEntitiesUpgradeService.upgradeDatabase("3.4.1");
dataUpdateService.updateData("3.4.1");
log.info("Updating system data...");
systemDataLoaderService.updateSystemWidgets();
break;

View File

@ -26,6 +26,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.ResourceUtils;
import org.thingsboard.server.common.data.edge.Edge;
@ -34,6 +35,10 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.gen.edge.v1.EdgeRpcServiceGrpc;
import org.thingsboard.server.gen.edge.v1.RequestMsg;
import org.thingsboard.server.gen.edge.v1.ResponseMsg;
@ -59,6 +64,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@Service
@Slf4j
@ -71,6 +77,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>();
private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>();
private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>();
@Value("${edges.rpc.port}")
private int rpcPort;
@Value("${edges.rpc.ssl.enabled}")
@ -98,12 +106,17 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
@Autowired
private TelemetrySubscriptionService tsSubService;
@Autowired
private TbClusterService clusterService;
private Server server;
private ScheduledExecutorService edgeEventProcessingExecutorService;
private ScheduledExecutorService sendDownlinkExecutorService;
private ScheduledExecutorService executorService;
@PostConstruct
public void init() {
log.info("Initializing Edge RPC service!");
@ -129,8 +142,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
log.error("Failed to start Edge RPC server!", e);
throw new RuntimeException("Failed to start Edge RPC server!");
}
this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler"));
this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-event-check-scheduler"));
this.sendDownlinkExecutorService = Executors.newScheduledThreadPool(sendSchedulerPoolSize, ThingsBoardThreadFactory.forName("edge-send-scheduler"));
this.executorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("edge-service"));
log.info("Edge RPC service initialized!");
}
@ -153,6 +167,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
if (sendDownlinkExecutorService != null) {
sendDownlinkExecutorService.shutdownNow();
}
if (executorService != null) {
executorService.shutdownNow();
}
}
@Override
@ -160,47 +177,76 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, sendDownlinkExecutorService).getInputStream();
}
@Override
public void onToEdgeSessionMsg(TenantId tenantId, EdgeSessionMsg msg) {
executorService.execute(() -> {
switch (msg.getMsgType()) {
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
EdgeEventUpdateMsg edgeEventUpdateMsg = (EdgeEventUpdateMsg) msg;
log.trace("[{}] onToEdgeSessionMsg [{}]", edgeEventUpdateMsg.getTenantId(), msg);
onEdgeEvent(tenantId, edgeEventUpdateMsg.getEdgeId());
break;
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
ToEdgeSyncRequest toEdgeSyncRequest = (ToEdgeSyncRequest) msg;
log.trace("[{}] toEdgeSyncRequest [{}]", toEdgeSyncRequest.getTenantId(), msg);
startSyncProcess(tenantId, toEdgeSyncRequest.getEdgeId(), toEdgeSyncRequest.getId());
break;
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
FromEdgeSyncResponse fromEdgeSyncResponse = (FromEdgeSyncResponse) msg;
log.trace("[{}] fromEdgeSyncResponse [{}]", fromEdgeSyncResponse.getTenantId(), msg);
processSyncResponse(fromEdgeSyncResponse);
break;
}
});
}
@Override
public void updateEdge(TenantId tenantId, Edge edge) {
EdgeGrpcSession session = sessions.get(edge.getId());
if (session != null && session.isConnected()) {
log.debug("[{}] Updating configuration for edge [{}] [{}]", tenantId, edge.getName(), edge.getId());
session.onConfigurationUpdate(edge);
} else {
log.debug("[{}] Session doesn't exist for edge [{}] [{}]", tenantId, edge.getName(), edge.getId());
}
executorService.execute(() -> {
EdgeGrpcSession session = sessions.get(edge.getId());
if (session != null && session.isConnected()) {
log.debug("[{}] Updating configuration for edge [{}] [{}]", tenantId, edge.getName(), edge.getId());
session.onConfigurationUpdate(edge);
} else {
log.debug("[{}] Session doesn't exist for edge [{}] [{}]", tenantId, edge.getName(), edge.getId());
}
});
}
@Override
public void deleteEdge(TenantId tenantId, EdgeId edgeId) {
executorService.execute(() -> {
EdgeGrpcSession session = sessions.get(edgeId);
if (session != null && session.isConnected()) {
log.info("[{}] Closing and removing session for edge [{}]", tenantId, edgeId);
session.close();
sessions.remove(edgeId);
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
newEventLock.lock();
try {
sessionNewEvents.remove(edgeId);
} finally {
newEventLock.unlock();
}
cancelScheduleEdgeEventsCheck(edgeId);
}
});
}
private void onEdgeEvent(TenantId tenantId, EdgeId edgeId) {
EdgeGrpcSession session = sessions.get(edgeId);
if (session != null && session.isConnected()) {
log.info("[{}] Closing and removing session for edge [{}]", tenantId, edgeId);
session.close();
sessions.remove(edgeId);
log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId.getId());
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
newEventLock.lock();
try {
sessionNewEvents.remove(edgeId);
if (Boolean.FALSE.equals(sessionNewEvents.get(edgeId))) {
log.trace("[{}] set session new events flag to true [{}]", tenantId, edgeId.getId());
sessionNewEvents.put(edgeId, true);
}
} finally {
newEventLock.unlock();
}
cancelScheduleEdgeEventsCheck(edgeId);
}
}
@Override
public void onEdgeEvent(TenantId tenantId, EdgeId edgeId) {
log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId.getId());
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
newEventLock.lock();
try {
if (Boolean.FALSE.equals(sessionNewEvents.get(edgeId))) {
log.trace("[{}] set session new events flag to true [{}]", tenantId, edgeId.getId());
sessionNewEvents.put(edgeId, true);
}
} finally {
newEventLock.unlock();
}
}
@ -220,14 +266,47 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
scheduleEdgeEventsCheck(edgeGrpcSession);
}
@Override
public void startSyncProcess(TenantId tenantId, EdgeId edgeId) {
private void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId) {
EdgeGrpcSession session = sessions.get(edgeId);
if (session != null && session.isConnected()) {
session.startSyncProcess(tenantId, edgeId);
if (session != null) {
boolean success = false;
if (session.isConnected()) {
session.startSyncProcess(tenantId, edgeId);
success = true;
}
clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success));
}
}
@Override
public void processSyncRequest(ToEdgeSyncRequest request, Consumer<FromEdgeSyncResponse> responseConsumer) {
log.trace("[{}][{}] Processing sync edge request [{}]", request.getTenantId(), request.getId(), request.getEdgeId());
UUID requestId = request.getId();
localSyncEdgeRequests.put(requestId, responseConsumer);
clusterService.pushEdgeSyncRequestToCore(request);
scheduleSyncRequestTimeout(request, requestId);
}
private void scheduleSyncRequestTimeout(ToEdgeSyncRequest request, UUID requestId) {
log.trace("[{}] scheduling sync edge request", requestId);
executorService.schedule(() -> {
log.trace("[{}] checking if sync edge request is not processed...", requestId);
Consumer<FromEdgeSyncResponse> consumer = localSyncEdgeRequests.remove(requestId);
if (consumer != null) {
log.trace("[{}] timeout for processing sync edge request.", requestId);
consumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false));
}
}, 20, TimeUnit.SECONDS);
}
private void processSyncResponse(FromEdgeSyncResponse response) {
log.trace("[{}] Received response from sync service: [{}]", response.getId(), response);
UUID requestId = response.getId();
Consumer<FromEdgeSyncResponse> consumer = localSyncEdgeRequests.remove(requestId);
if (consumer != null) {
consumer.accept(response);
} else {
log.error("[{}] Edge is not connected [{}]", tenantId, edgeId);
throw new RuntimeException("Edge is not connected");
log.trace("[{}] Unknown or stale sync response received [{}]", requestId, response);
}
}

View File

@ -18,14 +18,19 @@ package org.thingsboard.server.service.edge.rpc;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import java.util.function.Consumer;
public interface EdgeRpcService {
void onToEdgeSessionMsg(TenantId tenantId, EdgeSessionMsg msg);
void updateEdge(TenantId tenantId, Edge edge);
void deleteEdge(TenantId tenantId, EdgeId edgeId);
void onEdgeEvent(TenantId tenantId, EdgeId edgeId);
void startSyncProcess(TenantId tenantId, EdgeId edgeId);
void processSyncRequest(ToEdgeSyncRequest request, Consumer<FromEdgeSyncResponse> responseConsumer);
}

View File

@ -80,10 +80,12 @@ import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstruct
import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.WidgetTypeMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.WidgetsBundleMsgConstructor;
import org.thingsboard.server.service.entitiy.TbNotificationEntityService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.util.ArrayList;
import java.util.List;
@ -94,6 +96,12 @@ public abstract class BaseEdgeProcessor {
protected static final int DEFAULT_PAGE_SIZE = 100;
@Autowired
protected TelemetrySubscriptionService tsSubService;
@Autowired
protected TbNotificationEntityService notificationEntityService;
@Autowired
protected RuleChainService ruleChainService;

View File

@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -58,6 +59,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.util.JsonUtils;
import org.thingsboard.server.controller.BaseController;
import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.EntityDataProto;
@ -102,7 +104,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
}
if (entityData.hasAttributesUpdatedMsg()) {
metaData.putValue("scope", entityData.getPostAttributeScope());
result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
}
if (entityData.hasPostTelemetryMsg()) {
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
@ -225,39 +227,36 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
return futureToSet;
}
private ListenableFuture<Void> processAttributesUpdate(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
private ListenableFuture<Void> processAttributesUpdate(TenantId tenantId,
EntityId entityId,
TransportProtos.PostAttributeMsg msg,
TbMsgMetaData metaData) {
SettableFuture<Void> futureToSet = SettableFuture.create();
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(json);
ListenableFuture<List<String>> future = attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes));
Futures.addCallback(future, new FutureCallback<>() {
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json));
String scope = metaData.getValue("scope");
tsSubService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable List<String> keys) {
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), DataConstants.ATTRIBUTES_UPDATED, entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null);
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Can't process attributes update [{}]", msg, t);
futureToSet.setException(t);
}
});
public void onSuccess(@Nullable Void tmp) {
logAttributesUpdated(tenantId, entityId, scope, attributes, null);
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Can't process attributes update [{}]", msg, t);
logAttributesUpdated(tenantId, entityId, scope, attributes, t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
});
return futureToSet;
}
private void logAttributesUpdated(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, Throwable e) {
notificationEntityService.logEntityAction(tenantId, entityId, ActionType.ATTRIBUTES_UPDATED, null,
BaseController.toException(e), scope, attributes);
}
private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) {
SettableFuture<Void> futureToSet = SettableFuture.create();
String scope = attributeDeleteMsg.getScope();

View File

@ -15,8 +15,6 @@
*/
package org.thingsboard.server.service.install;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -25,10 +23,7 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
@ -37,8 +32,6 @@ import org.thingsboard.server.common.data.queue.ProcessingStrategyType;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.queue.SubmitStrategy;
import org.thingsboard.server.common.data.queue.SubmitStrategyType;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.dao.asset.AssetProfileService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.dashboard.DashboardService;
@ -63,11 +56,8 @@ import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.thingsboard.server.service.install.DatabaseHelper.ADDITIONAL_INFO;
import static org.thingsboard.server.service.install.DatabaseHelper.ASSIGNED_CUSTOMERS;
@ -620,8 +610,8 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
case "3.4.1":
try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
log.info("Updating schema ...");
runSchemaUpdateScript(conn, "3.4.1");
if (isOldSchema(conn, 3004001)) {
try {
conn.createStatement().execute("ALTER TABLE asset ADD COLUMN asset_profile_id uuid");
} catch (Exception e) {
@ -669,6 +659,11 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
}
}
private void runSchemaUpdateScript(Connection connection, String version) throws Exception {
Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", version, SCHEMA_UPDATE_SQL);
loadSql(schemaUpdateFile, connection);
}
private void loadSql(Path sqlFile, Connection conn) throws Exception {
String sql = new String(Files.readAllBytes(sqlFile), Charset.forName("UTF-8"));
Statement st = conn.createStatement();

View File

@ -63,6 +63,7 @@ import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.alarm.AlarmDao;
import org.thingsboard.server.dao.audit.AuditLogDao;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.event.EventService;
@ -138,6 +139,9 @@ public class DefaultDataUpdateService implements DataUpdateService {
@Autowired
private EventService eventService;
@Autowired
private AuditLogDao auditLogDao;
@Override
public void updateData(String fromVersion) throws Exception {
switch (fromVersion) {
@ -170,12 +174,22 @@ public class DefaultDataUpdateService implements DataUpdateService {
rateLimitsUpdater.updateEntities();
break;
case "3.4.0":
String skipEventsMigration = System.getenv("TB_SKIP_EVENTS_MIGRATION");
if (skipEventsMigration == null || skipEventsMigration.equalsIgnoreCase("false")) {
boolean skipEventsMigration = getEnv("TB_SKIP_EVENTS_MIGRATION", false);
if (!skipEventsMigration) {
log.info("Updating data from version 3.4.0 to 3.4.1 ...");
eventService.migrateEvents();
}
break;
case "3.4.1":
boolean skipAuditLogsMigration = getEnv("TB_SKIP_AUDIT_LOGS_MIGRATION", false);
if (!skipAuditLogsMigration) {
log.info("Updating data from version 3.4.1 to 3.4.2 ...");
log.info("Starting audit logs migration. Can be skipped with TB_SKIP_AUDIT_LOGS_MIGRATION env variable set to true");
auditLogDao.migrateAuditLogs();
} else {
log.info("Skipping audit logs migration");
}
break;
default:
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
}
@ -645,4 +659,13 @@ public class DefaultDataUpdateService implements DataUpdateService {
return mainQueueConfiguration;
}
private boolean getEnv(String name, boolean defaultValue) {
String env = System.getenv(name);
if (env == null) {
return defaultValue;
} else {
return Boolean.parseBoolean(env);
}
}
}

View File

@ -51,6 +51,8 @@ import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@ -372,12 +374,32 @@ public class DefaultTbClusterService implements TbClusterService {
log.trace("[{}] Processing edge {} event update ", tenantId, edgeId);
EdgeEventUpdateMsg msg = new EdgeEventUpdateMsg(tenantId, edgeId);
byte[] msgBytes = encodingService.encode(msg);
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdateMsg(ByteString.copyFrom(msgBytes)).build();
pushEdgeSyncMsgToCore(edgeId, toCoreMsg);
}
@Override
public void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest) {
log.trace("[{}] Processing edge sync request {} ", toEdgeSyncRequest.getTenantId(), toEdgeSyncRequest);
byte[] msgBytes = encodingService.encode(toEdgeSyncRequest);
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToEdgeSyncRequestMsg(ByteString.copyFrom(msgBytes)).build();
pushEdgeSyncMsgToCore(toEdgeSyncRequest.getEdgeId(), toCoreMsg);
}
@Override
public void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse) {
log.trace("[{}] Processing edge sync response {}", fromEdgeSyncResponse.getTenantId(), fromEdgeSyncResponse);
byte[] msgBytes = encodingService.encode(fromEdgeSyncResponse);
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setFromEdgeSyncResponseMsg(ByteString.copyFrom(msgBytes)).build();
pushEdgeSyncMsgToCore(fromEdgeSyncResponse.getEdgeId(), toCoreMsg);
}
private void pushEdgeSyncMsgToCore(EdgeId edgeId, ToCoreNotificationMsg toCoreMsg) {
TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
for (String serviceId : tbCoreServices) {
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdateMsg(ByteString.copyFrom(msgBytes)).build();
toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEdgeId().getId(), toCoreMsg), null);
toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(edgeId.getId(), toCoreMsg), null);
toCoreNfs.incrementAndGet();
}
}

View File

@ -20,8 +20,6 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
@ -77,7 +75,6 @@ import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import org.thingsboard.server.service.sync.vc.EntitiesVersionControlService;
import org.thingsboard.server.service.sync.vc.GitVersionControlQueueService;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
@ -319,13 +316,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreNotification.getComponentLifecycleMsg() != null && !toCoreNotification.getComponentLifecycleMsg().isEmpty()) {
handleComponentLifecycleMsg(id, toCoreNotification.getComponentLifecycleMsg());
callback.onSuccess();
} else if (toCoreNotification.getEdgeEventUpdateMsg() != null && !toCoreNotification.getEdgeEventUpdateMsg().isEmpty()) {
Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreNotification.getEdgeEventUpdateMsg().toByteArray());
if (actorMsg.isPresent()) {
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
actorContext.tellWithHighPriority(actorMsg.get());
}
callback.onSuccess();
} else if (!toCoreNotification.getEdgeEventUpdateMsg().isEmpty()) {
forwardToAppActor(id, encodingService.decode(toCoreNotification.getEdgeEventUpdateMsg().toByteArray()), callback);
} else if (!toCoreNotification.getToEdgeSyncRequestMsg().isEmpty()) {
forwardToAppActor(id, encodingService.decode(toCoreNotification.getToEdgeSyncRequestMsg().toByteArray()), callback);
} else if (!toCoreNotification.getFromEdgeSyncResponseMsg().isEmpty()) {
forwardToAppActor(id, encodingService.decode(toCoreNotification.getFromEdgeSyncResponseMsg().toByteArray()), callback);
} else if (toCoreNotification.hasQueueUpdateMsg()) {
TransportProtos.QueueUpdateMsg queue = toCoreNotification.getQueueUpdateMsg();
partitionService.updateQueue(queue);
@ -554,6 +550,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));
}
private void forwardToAppActor(UUID id, Optional<TbActorMsg> actorMsg, TbCallback callback) {
if (actorMsg.isPresent()) {
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
actorContext.tell(actorMsg.get());
}
callback.onSuccess();
}
private void throwNotHandled(Object msg, TbCallback callback) {
log.warn("Message not handled: {}", msg);
callback.onFailure(new RuntimeException("Message not handled!"));

View File

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ttl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.server.dao.audit.AuditLogDao;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import org.thingsboard.server.queue.discovery.PartitionService;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.dao.model.ModelConstants.AUDIT_LOG_COLUMN_FAMILY_NAME;
@Service
@ConditionalOnExpression("${sql.ttl.audit_logs.enabled:true} && ${sql.ttl.audit_logs.ttl:0} > 0")
@Slf4j
public class AuditLogsCleanUpService extends AbstractCleanUpService {
private final AuditLogDao auditLogDao;
private final SqlPartitioningRepository partitioningRepository;
@Value("${sql.ttl.audit_logs.ttl:0}")
private long ttlInSec;
@Value("${sql.audit_logs.partition_size:168}")
private int partitionSizeInHours;
public AuditLogsCleanUpService(PartitionService partitionService, AuditLogDao auditLogDao, SqlPartitioningRepository partitioningRepository) {
super(partitionService);
this.auditLogDao = auditLogDao;
this.partitioningRepository = partitioningRepository;
}
@Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.audit_logs.checking_interval_ms})}",
fixedDelayString = "${sql.ttl.audit_logs.checking_interval_ms}")
public void cleanUp() {
long auditLogsExpTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec);
if (isSystemTenantPartitionMine()) {
auditLogDao.cleanUpAuditLogs(auditLogsExpTime);
} else {
partitioningRepository.cleanupPartitionsCache(AUDIT_LOG_COLUMN_FAMILY_NAME, auditLogsExpTime, TimeUnit.HOURS.toMillis(partitionSizeInHours));
}
}
}

View File

@ -265,6 +265,8 @@ sql:
batch_size: "${SQL_EDGE_EVENTS_BATCH_SIZE:1000}"
batch_max_delay: "${SQL_EDGE_EVENTS_BATCH_MAX_DELAY_MS:100}"
stats_print_interval_ms: "${SQL_EDGE_EVENTS_BATCH_STATS_PRINT_MS:10000}"
audit_logs:
partition_size: "${SQL_AUDIT_LOGS_PARTITION_SIZE_HOURS:168}" # Default value - 1 week
# Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks
batch_sort: "${SQL_BATCH_SORT:false}"
# Specify whether to remove null characters from strValue of attributes and timeseries before insert
@ -303,6 +305,10 @@ sql:
rpc:
enabled: "${SQL_TTL_RPC_ENABLED:true}"
checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
audit_logs:
enabled: "${SQL_TTL_AUDIT_LOGS_ENABLED:true}"
ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. Accuracy of the cleanup depends on the sql.audit_logs.partition_size
checking_interval_ms: "${SQL_TTL_AUDIT_LOGS_CHECKING_INTERVAL_MS:86400000}" # Default value - 1 day
relations:
max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # //This value has to be reasonable small to prevent infinite recursion as early as possible

View File

@ -20,18 +20,33 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.audit.AuditLog;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.dao.audit.AuditLogDao;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import org.thingsboard.server.service.ttl.AuditLogsCleanUpService;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
public abstract class BaseAuditLogControllerTest extends AbstractControllerTest {
@ -39,6 +54,18 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest
private Tenant savedTenant;
private User tenantAdmin;
@Autowired
private AuditLogDao auditLogDao;
@SpyBean
private SqlPartitioningRepository partitioningRepository;
@Autowired
private AuditLogsCleanUpService auditLogsCleanUpService;
@Value("#{${sql.audit_logs.partition_size} * 60 * 60 * 1000}")
private long partitionDurationInMs;
@Value("${sql.ttl.audit_logs.ttl}")
private long auditLogsTtlInSec;
@Before
public void beforeTest() throws Exception {
loginSysAdmin();
@ -145,4 +172,45 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest
Assert.assertEquals(179, loadedAuditLogs.size());
}
@Test
public void whenSavingNewAuditLog_thenCheckAndCreatePartitionIfNotExists() {
reset(partitioningRepository);
AuditLog auditLog = createAuditLog(ActionType.LOGIN, tenantAdminUserId);
verify(partitioningRepository).createPartitionIfNotExists(eq("audit_log"), eq(auditLog.getCreatedTime()), eq(partitionDurationInMs));
List<Long> partitions = partitioningRepository.fetchPartitions("audit_log");
assertThat(partitions).singleElement().satisfies(partitionStartTs -> {
assertThat(partitionStartTs).isEqualTo(partitioningRepository.calculatePartitionStartTime(auditLog.getCreatedTime(), partitionDurationInMs));
});
}
@Test
public void whenCleaningUpAuditLogsByTtl_thenDropOldPartitions() {
long oldAuditLogTs = LocalDate.of(2020, 10, 1).atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli();
long partitionStartTs = partitioningRepository.calculatePartitionStartTime(oldAuditLogTs, partitionDurationInMs);
partitioningRepository.createPartitionIfNotExists("audit_log", oldAuditLogTs, partitionDurationInMs);
List<Long> partitions = partitioningRepository.fetchPartitions("audit_log");
assertThat(partitions).contains(partitionStartTs);
auditLogsCleanUpService.cleanUp();
partitions = partitioningRepository.fetchPartitions("audit_log");
assertThat(partitions).doesNotContain(partitionStartTs);
assertThat(partitions).allSatisfy(partitionsStart -> {
long partitionEndTs = partitionsStart + partitionDurationInMs;
assertThat(partitionEndTs).isGreaterThan(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(auditLogsTtlInSec));
});
}
private AuditLog createAuditLog(ActionType actionType, EntityId entityId) {
AuditLog auditLog = new AuditLog();
auditLog.setTenantId(tenantId);
auditLog.setCustomerId(null);
auditLog.setUserId(tenantAdminUserId);
auditLog.setEntityId(entityId);
auditLog.setUserName(tenantAdmin.getEmail());
auditLog.setActionType(actionType);
return auditLogDao.save(tenantId, auditLog);
}
}

View File

@ -21,9 +21,12 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.JsonObject;
import com.google.protobuf.AbstractMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
@ -53,6 +56,8 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.gen.edge.v1.UplinkMsg;
import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import java.util.List;
import java.util.Map;
@ -63,6 +68,9 @@ import java.util.concurrent.TimeUnit;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@TestPropertySource(properties = {
"transport.mqtt.enabled=true"
})
abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
@Test
@ -579,4 +587,40 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
return doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/values/timeseries?keys=" + timeseriesKey,
new TypeReference<>() {});
}
@Test
public void sendUpdateSharedAttributeToCloudAndValidateDeviceSubscription() throws Exception {
Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge();
DeviceCredentials deviceCredentials = doGet("/api/device/" + device.getUuidId() + "/credentials", DeviceCredentials.class);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(deviceCredentials.getCredentialsId());
MqttTestCallback onUpdateCallback = new MqttTestCallback();
client.setCallback(onUpdateCallback);
client.subscribeAndWait("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE);
edgeImitator.expectResponsesAmount(1);
JsonObject attributesData = new JsonObject();
String attrKey = "sharedAttrName";
String attrValue = "sharedAttrValue";
attributesData.addProperty(attrKey, attrValue);
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder();
entityDataBuilder.setEntityType(device.getId().getEntityType().name());
entityDataBuilder.setEntityIdMSB(device.getId().getId().getMostSignificantBits());
entityDataBuilder.setEntityIdLSB(device.getId().getId().getLeastSignificantBits());
entityDataBuilder.setAttributesUpdatedMsg(JsonConverter.convertToAttributesProto(attributesData));
entityDataBuilder.setPostAttributeScope(DataConstants.SHARED_SCOPE);
uplinkMsgBuilder.addEntityData(entityDataBuilder.build());
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
Assert.assertTrue(edgeImitator.waitForResponses());
Assert.assertTrue(onUpdateCallback.getSubscribeLatch().await(5, TimeUnit.SECONDS));
Assert.assertEquals(JacksonUtil.OBJECT_MAPPER.createObjectNode().put(attrKey, attrValue),
JacksonUtil.fromBytes(onUpdateCallback.getPayloadBytes()));
}
}

View File

@ -56,4 +56,7 @@ queue.rule-engine.queues[2].processing-strategy.retries=1
queue.rule-engine.queues[2].processing-strategy.pause-between-retries=0
queue.rule-engine.queues[2].processing-strategy.max-pause-between-retries=0
usage.stats.report.enabled=false
usage.stats.report.enabled=false
sql.audit_logs.partition_size=24
sql.ttl.audit_logs.ttl=2592000

View File

@ -29,6 +29,8 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
@ -88,5 +90,9 @@ public interface TbClusterService extends TbQueueClusterService {
void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId);
void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest);
void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse);
void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action);
}

View File

@ -932,6 +932,8 @@ message ToCoreNotificationMsg {
QueueUpdateMsg queueUpdateMsg = 5;
QueueDeleteMsg queueDeleteMsg = 6;
VersionControlResponseMsg vcResponseMsg = 7;
bytes toEdgeSyncRequestMsg = 8;
bytes fromEdgeSyncResponseMsg = 9;
}
/* Messages that are handled by ThingsBoard RuleEngine Service */

View File

@ -22,6 +22,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.server.common.data.BaseData;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.validation.NoXss;
@ApiModel
@EqualsAndHashCode(callSuper = true)
@ -34,6 +35,7 @@ public class AuditLog extends BaseData<AuditLogId> {
private CustomerId customerId;
@ApiModelProperty(position = 5, value = "JSON object with Entity id", accessMode = ApiModelProperty.AccessMode.READ_ONLY)
private EntityId entityId;
@NoXss
@ApiModelProperty(position = 6, value = "Name of the logged entity", example = "Thermometer", accessMode = ApiModelProperty.AccessMode.READ_ONLY)
private String entityName;
@ApiModelProperty(position = 7, value = "JSON object with User id.", accessMode = ApiModelProperty.AccessMode.READ_ONLY)

View File

@ -122,6 +122,12 @@ public enum MsgType {
/**
* Message that is sent on Edge Event to Edge Session
*/
EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG;
EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG,
/**
* Messages that are sent to and from edge session to start edge synchronization process
*/
EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG,
EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG;
}

View File

@ -20,11 +20,9 @@ import lombok.ToString;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
@ToString
public class EdgeEventUpdateMsg implements TenantAwareMsg, ToAllNodesMsg {
public class EdgeEventUpdateMsg implements EdgeSessionMsg {
@Getter
private final TenantId tenantId;
@Getter

View File

@ -0,0 +1,24 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.edge;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import java.io.Serializable;
public interface EdgeSessionMsg extends TenantAwareMsg, ToAllNodesMsg {
}

View File

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.edge;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import java.util.UUID;
@AllArgsConstructor
@Getter
public class FromEdgeSyncResponse implements EdgeSessionMsg {
private final UUID id;
private final TenantId tenantId;
private final EdgeId edgeId;
private final boolean success;
@Override
public MsgType getMsgType() {
return MsgType.EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG;
}
}

View File

@ -0,0 +1,37 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.edge;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import java.util.UUID;
@AllArgsConstructor
@Getter
public class ToEdgeSyncRequest implements EdgeSessionMsg {
private final UUID id;
private final TenantId tenantId;
private final EdgeId edgeId;
@Override
public MsgType getMsgType() {
return MsgType.EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG;
}
}

View File

@ -39,4 +39,9 @@ public interface AuditLogDao extends Dao<AuditLog> {
PageData<AuditLog> findAuditLogsByTenantIdAndUserId(UUID tenantId, UserId userId, List<ActionType> actionTypes, TimePageLink pageLink);
PageData<AuditLog> findAuditLogsByTenantId(UUID tenantId, List<ActionType> actionTypes, TimePageLink pageLink);
void cleanUpAuditLogs(long expTime);
void migrateAuditLogs();
}

View File

@ -23,13 +23,13 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.HasName;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.audit.ActionStatus;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.audit.AuditLog;
@ -382,7 +382,15 @@ public class AuditLogServiceImpl implements AuditLogService {
AuditLog auditLogEntry = createAuditLogEntry(tenantId, entityId, entityName, customerId, userId, userName,
actionType, actionData, actionStatus, actionFailureDetails);
log.trace("Executing logAction [{}]", auditLogEntry);
auditLogValidator.validate(auditLogEntry, AuditLog::getTenantId);
try {
auditLogValidator.validate(auditLogEntry, AuditLog::getTenantId);
} catch (Exception e) {
if (StringUtils.contains(e.getMessage(), "value is malformed")) {
auditLogEntry.setEntityName("MALFORMED");
} else {
return Futures.immediateFailedFuture(e);
}
}
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
futures.add(auditLogDao.saveByTenantId(auditLogEntry));

View File

@ -62,7 +62,7 @@ public abstract class DataValidator<D extends BaseData<?>> {
}
return old;
} catch (DataValidationException e) {
log.error("Data object is invalid: [{}]", e.getMessage());
log.error("{} object is invalid: [{}]", data == null ? "Data" : data.getClass().getSimpleName(), e.getMessage());
throw e;
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.sql.SQLException;
@ -32,6 +33,9 @@ public abstract class JpaAbstractDaoListeningExecutorService {
@Autowired
protected DataSource dataSource;
@Autowired
protected JdbcTemplate jdbcTemplate;
protected void printWarnings(Statement statement) throws SQLException {
SQLWarning warnings = statement.getWarnings();
if (warnings != null) {

View File

@ -15,33 +15,52 @@
*/
package org.thingsboard.server.dao.sql.audit;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.ListenableFuture;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.audit.AuditLog;
import org.thingsboard.server.common.data.id.AuditLogId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.audit.AuditLogDao;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.model.sql.AuditLogEntity;
import org.thingsboard.server.dao.sql.JpaAbstractDao;
import org.thingsboard.server.dao.util.SqlDao;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Component
@SqlDao
@RequiredArgsConstructor
@Slf4j
public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> implements AuditLogDao {
@Autowired
private AuditLogRepository auditLogRepository;
private final AuditLogRepository auditLogRepository;
private final SqlPartitioningRepository partitioningRepository;
private final JdbcTemplate jdbcTemplate;
@Value("${sql.audit_logs.partition_size:168}")
private int partitionSizeInHours;
@Value("${sql.ttl.audit_logs.ttl:0}")
private long ttlInSec;
private static final String TABLE_NAME = ModelConstants.AUDIT_LOG_COLUMN_FAMILY_NAME;
@Override
protected Class<AuditLogEntity> getEntityClass() {
@ -61,6 +80,17 @@ public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> imp
});
}
@Override
public AuditLog save(TenantId tenantId, AuditLog auditLog) {
if (auditLog.getId() == null) {
UUID uuid = Uuids.timeBased();
auditLog.setId(new AuditLogId(uuid));
auditLog.setCreatedTime(Uuids.unixTimestamp(uuid));
}
partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
return super.save(tenantId, auditLog);
}
@Override
public PageData<AuditLog> findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List<ActionType> actionTypes, TimePageLink pageLink) {
return DaoUtil.toPageData(
@ -115,4 +145,41 @@ public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> imp
actionTypes,
DaoUtil.toPageable(pageLink)));
}
@Override
public void cleanUpAuditLogs(long expTime) {
partitioningRepository.dropPartitionsBefore(TABLE_NAME, expTime, TimeUnit.HOURS.toMillis(partitionSizeInHours));
}
@Override
public void migrateAuditLogs() {
long startTime = ttlInSec > 0 ? System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec) : 1480982400000L;
long currentTime = System.currentTimeMillis();
var partitionStepInMs = TimeUnit.HOURS.toMillis(partitionSizeInHours);
long numberOfPartitions = (currentTime - startTime) / partitionStepInMs;
if (numberOfPartitions > 1000) {
String error = "Please adjust your audit logs partitioning configuration. Configuration with partition size " +
"of " + partitionSizeInHours + " hours and corresponding TTL will use " + numberOfPartitions + " " +
"(> 1000) partitions which is not recommended!";
log.error(error);
throw new RuntimeException(error);
}
while (startTime < currentTime) {
var endTime = startTime + partitionStepInMs;
log.info("Migrating audit logs for time period: {} - {}", startTime, endTime);
callMigrationFunction(startTime, endTime, partitionStepInMs);
startTime = endTime;
}
log.info("Audit logs migration finished");
jdbcTemplate.execute("DROP TABLE IF EXISTS old_audit_log");
}
private void callMigrationFunction(long startTime, long endTime, long partitionSizeInMs) {
jdbcTemplate.update("CALL migrate_audit_logs(?, ?, ?)", startTime, endTime, partitionSizeInMs);
}
}

View File

@ -18,10 +18,8 @@ package org.thingsboard.server.dao.sql.event;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.event.ErrorEventFilter;
@ -43,7 +41,6 @@ import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import org.thingsboard.server.dao.timeseries.SqlPartition;
import org.thingsboard.server.dao.util.SqlDao;
import javax.annotation.PostConstruct;
@ -54,7 +51,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
/**
@ -65,9 +61,6 @@ import java.util.function.Function;
@SqlDao
public class JpaBaseEventDao implements EventDao {
private final Map<EventType, Map<Long, SqlPartition>> partitionsByEventType = new ConcurrentHashMap<>();
private static final ReentrantLock partitionCreationLock = new ReentrantLock();
@Autowired
private EventPartitionConfiguration partitionConfiguration;
@ -122,9 +115,6 @@ public class JpaBaseEventDao implements EventDao {
@PostConstruct
private void init() {
for (EventType eventType : EventType.values()) {
partitionsByEventType.put(eventType, new ConcurrentHashMap<>());
}
TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder()
.logName("Events")
.batchSize(batchSize)
@ -165,42 +155,11 @@ public class JpaBaseEventDao implements EventDao {
event.setCreatedTime(System.currentTimeMillis());
}
}
savePartitionIfNotExist(event);
partitioningRepository.createPartitionIfNotExists(event.getType().getTable(), event.getCreatedTime(),
partitionConfiguration.getPartitionSizeInMs(event.getType()));
return queue.add(event);
}
private void savePartitionIfNotExist(Event event) {
EventType type = event.getType();
var partitionsMap = partitionsByEventType.get(type);
var partitionDuration = partitionConfiguration.getPartitionSizeInMs(type);
long partitionStartTs = event.getCreatedTime() - (event.getCreatedTime() % partitionDuration);
if (partitionsMap.get(partitionStartTs) == null) {
savePartition(partitionsMap, new SqlPartition(type.getTable(), partitionStartTs, partitionStartTs + partitionDuration, Long.toString(partitionStartTs)));
}
}
private void savePartition(Map<Long, SqlPartition> partitionsMap, SqlPartition sqlPartition) {
if (!partitionsMap.containsKey(sqlPartition.getStart())) {
partitionCreationLock.lock();
try {
log.trace("Saving partition: {}", sqlPartition);
partitioningRepository.save(sqlPartition);
log.trace("Adding partition to map: {}", sqlPartition);
partitionsMap.put(sqlPartition.getStart(), sqlPartition);
} catch (DataIntegrityViolationException ex) {
log.trace("Error occurred during partition save:", ex);
if (ex.getCause() instanceof ConstraintViolationException) {
log.warn("Saving partition [{}] rejected. Event data will save to the DEFAULT partition.", sqlPartition.getPartitionDate());
partitionsMap.put(sqlPartition.getStart(), sqlPartition);
} else {
throw new RuntimeException(ex);
}
} finally {
partitionCreationLock.unlock();
}
}
}
@Override
public PageData<? extends Event> findEvents(UUID tenantId, UUID entityId, EventType eventType, TimePageLink pageLink) {
return DaoUtil.toPageData(getEventRepository(eventType).findEvents(tenantId, entityId, pageLink.getStartTime(), pageLink.getEndTime(), DaoUtil.toPageable(pageLink, EventEntity.eventColumnMap)));
@ -438,23 +397,24 @@ public class JpaBaseEventDao implements EventDao {
log.info("Going to cleanup regular events with exp time: {}", regularEventExpTs);
if (cleanupDb) {
eventCleanupRepository.cleanupEvents(regularEventExpTs, false);
} else {
cleanupPartitionsCache(regularEventExpTs, false);
}
cleanupPartitions(regularEventExpTs, false);
}
if (debugEventExpTs > 0) {
log.info("Going to cleanup debug events with exp time: {}", debugEventExpTs);
if (cleanupDb) {
eventCleanupRepository.cleanupEvents(debugEventExpTs, true);
} else {
cleanupPartitionsCache(debugEventExpTs, true);
}
cleanupPartitions(debugEventExpTs, true);
}
}
private void cleanupPartitions(long expTime, boolean isDebug) {
private void cleanupPartitionsCache(long expTime, boolean isDebug) {
for (EventType eventType : EventType.values()) {
if (eventType.isDebug() == isDebug) {
Map<Long, SqlPartition> partitions = partitionsByEventType.get(eventType);
partitions.keySet().removeIf(startTs -> startTs + partitionConfiguration.getPartitionSizeInMs(eventType) < expTime);
partitioningRepository.cleanupPartitionsCache(eventType.getTable(), expTime, partitionConfiguration.getPartitionSizeInMs(eventType));
}
}
}

View File

@ -17,16 +17,12 @@ package org.thingsboard.server.dao.sql.event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.stereotype.Repository;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -34,13 +30,10 @@ import java.util.concurrent.TimeUnit;
@Repository
public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorService implements EventCleanupRepository {
private static final String SELECT_PARTITIONS_STMT = "SELECT tablename from pg_tables WHERE schemaname = 'public' and tablename like concat(?, '_%')";
private static final int PSQL_VERSION_14 = 140000;
@Autowired
private EventPartitionConfiguration partitionConfiguration;
private volatile Integer currentServerVersion;
@Autowired
private SqlPartitioningRepository partitioningRepository;
@Override
public void cleanupEvents(long eventExpTime, boolean debug) {
@ -59,16 +52,13 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe
callMigrateFunctionByPartitions("regular", "migrate_regular_events", regularEventTs, partitionConfiguration.getRegularPartitionSizeInHours());
callMigrateFunctionByPartitions("debug", "migrate_debug_events", debugEventTs, partitionConfiguration.getDebugPartitionSizeInHours());
try (Connection connection = dataSource.getConnection();
PreparedStatement dropFunction1 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_regular_events(bigint, bigint, int)");
PreparedStatement dropFunction2 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_debug_events(bigint, bigint, int)");
PreparedStatement dropTable = connection.prepareStatement("DROP TABLE IF EXISTS event")) {
dropFunction1.execute();
dropFunction2.execute();
dropTable.execute();
} catch (SQLException e) {
log.error("SQLException occurred during drop of the `events` table", e);
throw new RuntimeException(e);
try {
jdbcTemplate.execute("DROP PROCEDURE IF EXISTS migrate_regular_events(bigint, bigint, int)");
jdbcTemplate.execute("DROP PROCEDURE IF EXISTS migrate_debug_events(bigint, bigint, int)");
jdbcTemplate.execute("DROP TABLE IF EXISTS event");
} catch (DataAccessException e) {
log.error("Error occurred during drop of the `events` table", e);
throw e;
}
}
@ -94,13 +84,9 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe
}
private void callMigrateFunction(String functionName, long startTs, long endTs, int partitionSizeInHours) {
try (Connection connection = dataSource.getConnection();
PreparedStatement stmt = connection.prepareStatement("call " + functionName + "(?,?,?)")) {
stmt.setLong(1, startTs);
stmt.setLong(2, endTs);
stmt.setInt(3, partitionSizeInHours);
stmt.execute();
} catch (SQLException e) {
try {
jdbcTemplate.update("CALL " + functionName + "(?, ?, ?)", startTs, endTs, partitionSizeInHours);
} catch (DataAccessException e) {
if (e.getMessage() == null || !e.getMessage().contains("relation \"event\" does not exist")) {
log.error("[{}] SQLException occurred during execution of {} with parameters {} and {}", functionName, startTs, partitionSizeInHours, e);
throw new RuntimeException(e);
@ -109,82 +95,7 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe
}
private void cleanupEvents(EventType eventType, long eventExpTime) {
var partitionDuration = partitionConfiguration.getPartitionSizeInMs(eventType);
List<Long> partitions = fetchPartitions(eventType);
for (var partitionTs : partitions) {
var partitionEndTs = partitionTs + partitionDuration;
if (partitionEndTs < eventExpTime) {
log.info("[{}] Detaching expired partition: [{}-{}]", eventType, partitionTs, partitionEndTs);
if (detachAndDropPartition(eventType, partitionTs)) {
log.info("[{}] Detached expired partition: {}", eventType, partitionTs);
}
} else {
log.debug("[{}] Skip valid partition: {}", eventType, partitionTs);
}
}
}
private List<Long> fetchPartitions(EventType eventType) {
List<Long> partitions = new ArrayList<>();
try (Connection connection = dataSource.getConnection();
PreparedStatement stmt = connection.prepareStatement(SELECT_PARTITIONS_STMT)) {
stmt.setString(1, eventType.getTable());
stmt.execute();
try (ResultSet resultSet = stmt.getResultSet()) {
while (resultSet.next()) {
String partitionTableName = resultSet.getString(1);
String partitionTsStr = partitionTableName.substring(eventType.getTable().length() + 1);
try {
partitions.add(Long.parseLong(partitionTsStr));
} catch (NumberFormatException nfe) {
log.warn("Failed to parse table name: {}", partitionTableName);
}
}
}
} catch (SQLException e) {
log.error("SQLException occurred during events TTL task execution ", e);
}
return partitions;
}
private boolean detachAndDropPartition(EventType eventType, long partitionTs) {
String tablePartition = eventType.getTable() + "_" + partitionTs;
String detachPsqlStmtStr = "ALTER TABLE " + eventType.getTable() + " DETACH PARTITION " + tablePartition;
if (getCurrentServerVersion() >= PSQL_VERSION_14) {
detachPsqlStmtStr += " CONCURRENTLY";
}
String dropStmtStr = "DROP TABLE " + tablePartition;
try (Connection connection = dataSource.getConnection();
PreparedStatement detachStmt = connection.prepareStatement(detachPsqlStmtStr);
PreparedStatement dropStmt = connection.prepareStatement(dropStmtStr)) {
detachStmt.execute();
dropStmt.execute();
return true;
} catch (SQLException e) {
log.error("[{}] SQLException occurred during detach and drop of the partition: {}", eventType, partitionTs, e);
}
return false;
}
private synchronized int getCurrentServerVersion() {
if (currentServerVersion == null) {
try (Connection connection = dataSource.getConnection();
PreparedStatement versionStmt = connection.prepareStatement("SELECT current_setting('server_version_num')")) {
versionStmt.execute();
try (ResultSet resultSet = versionStmt.getResultSet()) {
while (resultSet.next()) {
currentServerVersion = resultSet.getInt(1);
}
}
} catch (SQLException e) {
log.warn("SQLException occurred during fetch of the server version", e);
}
if (currentServerVersion == null) {
currentServerVersion = 0;
}
}
return currentServerVersion;
partitioningRepository.dropPartitionsBefore(eventType.getTable(), eventExpTime, partitionConfiguration.getPartitionSizeInMs(eventType));
}
}

View File

@ -15,23 +15,148 @@
*/
package org.thingsboard.server.dao.sqlts.insert.sql;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.timeseries.SqlPartition;
import org.thingsboard.server.dao.util.SqlTsDao;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@Repository
@Transactional
@Slf4j
public class SqlPartitioningRepository {
@PersistenceContext
private EntityManager entityManager;
@Autowired
private JdbcTemplate jdbcTemplate;
private static final String SELECT_PARTITIONS_STMT = "SELECT tablename from pg_tables WHERE schemaname = 'public' and tablename like concat(?, '_%')";
private static final int PSQL_VERSION_14 = 140000;
private volatile Integer currentServerVersion;
private final Map<String, Map<Long, SqlPartition>> tablesPartitions = new ConcurrentHashMap<>();
private final ReentrantLock partitionCreationLock = new ReentrantLock();
@Transactional
public void save(SqlPartition partition) {
entityManager.createNativeQuery(partition.getQuery()).executeUpdate();
}
@Transactional
public void createPartitionIfNotExists(String table, long entityTs, long partitionDurationMs) {
long partitionStartTs = calculatePartitionStartTime(entityTs, partitionDurationMs);
Map<Long, SqlPartition> partitions = tablesPartitions.computeIfAbsent(table, t -> new ConcurrentHashMap<>());
if (!partitions.containsKey(partitionStartTs)) {
SqlPartition partition = new SqlPartition(table, partitionStartTs, partitionStartTs + partitionDurationMs, Long.toString(partitionStartTs));
partitionCreationLock.lock();
try {
if (partitions.containsKey(partitionStartTs)) return;
log.trace("Saving partition: {}", partition);
save(partition);
log.trace("Adding partition to map: {}", partition);
partitions.put(partition.getStart(), partition);
} catch (RuntimeException e) {
log.trace("Error occurred during partition save:", e);
String msg = ExceptionUtils.getRootCauseMessage(e);
if (msg.contains("would overlap partition")) {
log.warn("Couldn't save {} partition for {}, data will be saved to the default partition. SQL error: {}",
partition.getPartitionDate(), table, msg);
partitions.put(partition.getStart(), partition);
} else {
throw e;
}
} finally {
partitionCreationLock.unlock();
}
}
}
public void dropPartitionsBefore(String table, long ts, long partitionDurationMs) {
List<Long> partitions = fetchPartitions(table);
for (Long partitionStartTime : partitions) {
long partitionEndTime = partitionStartTime + partitionDurationMs;
if (partitionEndTime < ts) {
log.info("[{}] Detaching expired partition: [{}-{}]", table, partitionStartTime, partitionEndTime);
boolean success = detachAndDropPartition(table, partitionStartTime);
if (success) {
log.info("[{}] Detached expired partition: {}", table, partitionStartTime);
}
} else {
log.debug("[{}] Skipping valid partition: {}", table, partitionStartTime);
}
}
}
public void cleanupPartitionsCache(String table, long expTime, long partitionDurationMs) {
Map<Long, SqlPartition> partitions = tablesPartitions.get(table);
if (partitions == null) return;
partitions.keySet().removeIf(startTime -> (startTime + partitionDurationMs) < expTime);
}
private boolean detachAndDropPartition(String table, long partitionTs) {
Map<Long, SqlPartition> cachedPartitions = tablesPartitions.get(table);
if (cachedPartitions != null) cachedPartitions.remove(partitionTs);
String tablePartition = table + "_" + partitionTs;
String detachPsqlStmtStr = "ALTER TABLE " + table + " DETACH PARTITION " + tablePartition;
if (getCurrentServerVersion() >= PSQL_VERSION_14) {
detachPsqlStmtStr += " CONCURRENTLY";
}
String dropStmtStr = "DROP TABLE " + tablePartition;
try {
jdbcTemplate.execute(detachPsqlStmtStr);
jdbcTemplate.execute(dropStmtStr);
return true;
} catch (DataAccessException e) {
log.error("[{}] Error occurred trying to detach and drop the partition {} ", table, partitionTs, e);
}
return false;
}
public List<Long> fetchPartitions(String table) {
List<Long> partitions = new ArrayList<>();
List<String> partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, String.class, table);
for (String partitionTableName : partitionsTables) {
String partitionTsStr = partitionTableName.substring(table.length() + 1);
try {
partitions.add(Long.parseLong(partitionTsStr));
} catch (NumberFormatException nfe) {
log.warn("Failed to parse table name: {}", partitionTableName);
}
}
return partitions;
}
public long calculatePartitionStartTime(long ts, long partitionDuration) {
return ts - (ts % partitionDuration);
}
private synchronized int getCurrentServerVersion() {
if (currentServerVersion == null) {
try {
currentServerVersion = jdbcTemplate.queryForObject("SELECT current_setting('server_version_num')", Integer.class);
} catch (Exception e) {
log.warn("Error occurred during fetch of the server version", e);
}
if (currentServerVersion == null) {
currentServerVersion = 0;
}
}
return currentServerVersion;
}
}

View File

@ -48,7 +48,7 @@ CREATE INDEX IF NOT EXISTS idx_asset_type ON asset(tenant_id, type);
CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribute_kv(entity_id, attribute_key, last_update_ts desc);
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC);
CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id);

View File

@ -74,7 +74,7 @@ CREATE TABLE IF NOT EXISTS entity_alarm (
);
CREATE TABLE IF NOT EXISTS audit_log (
id uuid NOT NULL CONSTRAINT audit_log_pkey PRIMARY KEY,
id uuid NOT NULL,
created_time bigint NOT NULL,
tenant_id uuid,
customer_id uuid,
@ -87,7 +87,7 @@ CREATE TABLE IF NOT EXISTS audit_log (
action_data varchar(1000000),
action_status varchar(255),
action_failure_details varchar(1000000)
);
) PARTITION BY RANGE (created_time);
CREATE TABLE IF NOT EXISTS attribute_kv (
entity_type varchar(255),

View File

@ -42,7 +42,7 @@ public class NoXssValidatorTest {
"<p><a href=\"http://htmlbook.ru/example/knob.html\">Link!!!</a></p>1221",
"<h3>Please log in to proceed</h3> <form action=http://192.168.149.128>Username:<br><input type=\"username\" name=\"username\"></br>Password:<br><input type=\"password\" name=\"password\"></br><br><input type=\"submit\" value=\"Log in\"></br>",
" <img src= \"http://site.com/\" > ",
"123 <input type=text value=a onfocus=alert(1337) AUTOFOCUS>bebe",
"123 <input type=text value=a onfocus=alert(1337) AUTOFOCUS>bebe"
})
public void testIsNotValid(String stringWithXss) {
boolean isValid = validator.isValid(stringWithXss, mock(ConstraintValidatorContext.class));

View File

@ -38,7 +38,7 @@
<tb-postgres.docker.name>tb-postgres</tb-postgres.docker.name>
<tb-cassandra.docker.name>tb-cassandra</tb-cassandra.docker.name>
<pkg.installFolder>/usr/share/${pkg.name}</pkg.installFolder>
<pkg.upgradeVersion>3.3.3</pkg.upgradeVersion>
<pkg.upgradeVersion>3.4.2</pkg.upgradeVersion>
</properties>
<dependencies>