Remove redundant conversions into streams
This commit is contained in:
		
							parent
							
								
									2d840e77eb
								
							
						
					
					
						commit
						9ac9761780
					
				@ -121,7 +121,7 @@ public class AppActor extends ContextAwareActor {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    private void broadcast(Object msg) {
 | 
					    private void broadcast(Object msg) {
 | 
				
			||||||
        pluginManager.broadcast(msg);
 | 
					        pluginManager.broadcast(msg);
 | 
				
			||||||
        tenantActors.values().stream().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
 | 
					        tenantActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void onToRuleMsg(ToRuleActorMsg msg) {
 | 
					    private void onToRuleMsg(ToRuleActorMsg msg) {
 | 
				
			||||||
 | 
				
			|||||||
@ -111,7 +111,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
 | 
				
			|||||||
            Optional<ServerAddress> newTargetServer = systemContext.getRoutingService().resolve(getDeviceId());
 | 
					            Optional<ServerAddress> newTargetServer = systemContext.getRoutingService().resolve(getDeviceId());
 | 
				
			||||||
            if (!newTargetServer.equals(currentTargetServer)) {
 | 
					            if (!newTargetServer.equals(currentTargetServer)) {
 | 
				
			||||||
                currentTargetServer = newTargetServer;
 | 
					                currentTargetServer = newTargetServer;
 | 
				
			||||||
                pendingMap.values().stream().forEach(v -> {
 | 
					                pendingMap.values().forEach(v -> {
 | 
				
			||||||
                    forwardToAppActor(context, v, currentTargetServer);
 | 
					                    forwardToAppActor(context, v, currentTargetServer);
 | 
				
			||||||
                    if (currentTargetServer.isPresent()) {
 | 
					                    if (currentTargetServer.isPresent()) {
 | 
				
			||||||
                        logger.debug("[{}] Forwarded msg to new server: {}", sessionId, currentTargetServer.get());
 | 
					                        logger.debug("[{}] Forwarded msg to new server: {}", sessionId, currentTargetServer.get());
 | 
				
			||||||
 | 
				
			|||||||
@ -66,7 +66,7 @@ public class SessionManagerActor extends ContextAwareActor {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void broadcast(Object msg) {
 | 
					    private void broadcast(Object msg) {
 | 
				
			||||||
        sessionActors.values().stream().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
 | 
					        sessionActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void onSessionTimeout(SessionTimeoutMsg msg) {
 | 
					    private void onSessionTimeout(SessionTimeoutMsg msg) {
 | 
				
			||||||
 | 
				
			|||||||
@ -74,7 +74,7 @@ public abstract class PluginManager {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public void broadcast(Object msg) {
 | 
					    public void broadcast(Object msg) {
 | 
				
			||||||
        pluginActors.values().stream().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
 | 
					        pluginActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public void remove(PluginId id) {
 | 
					    public void remove(PluginId id) {
 | 
				
			||||||
 | 
				
			|||||||
@ -100,7 +100,7 @@ public class TenantActor extends ContextAwareActor {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    private void broadcast(Object msg) {
 | 
					    private void broadcast(Object msg) {
 | 
				
			||||||
        pluginManager.broadcast(msg);
 | 
					        pluginManager.broadcast(msg);
 | 
				
			||||||
        deviceActors.values().stream().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
 | 
					        deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void onToDeviceActorMsg(ToDeviceActorMsg msg) {
 | 
					    private void onToDeviceActorMsg(ToDeviceActorMsg msg) {
 | 
				
			||||||
 | 
				
			|||||||
@ -166,7 +166,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
 | 
				
			|||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
 | 
					    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
 | 
				
			||||||
        publishCurrentServer();
 | 
					        publishCurrentServer();
 | 
				
			||||||
        getOtherServers().stream().forEach(
 | 
					        getOtherServers().forEach(
 | 
				
			||||||
                server -> log.info("Found active server: [{}:{}]", server.getHost(), server.getPort())
 | 
					                server -> log.info("Found active server: [{}:{}]", server.getHost(), server.getPort())
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -194,13 +194,13 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
 | 
				
			|||||||
        log.info("Processing [{}] event for [{}:{}]", pathChildrenCacheEvent.getType(), instance.getHost(), instance.getPort());
 | 
					        log.info("Processing [{}] event for [{}:{}]", pathChildrenCacheEvent.getType(), instance.getHost(), instance.getPort());
 | 
				
			||||||
        switch (pathChildrenCacheEvent.getType()) {
 | 
					        switch (pathChildrenCacheEvent.getType()) {
 | 
				
			||||||
            case CHILD_ADDED:
 | 
					            case CHILD_ADDED:
 | 
				
			||||||
                listeners.stream().forEach(listener -> listener.onServerAdded(instance));
 | 
					                listeners.forEach(listener -> listener.onServerAdded(instance));
 | 
				
			||||||
                break;
 | 
					                break;
 | 
				
			||||||
            case CHILD_UPDATED:
 | 
					            case CHILD_UPDATED:
 | 
				
			||||||
                listeners.stream().forEach(listener -> listener.onServerUpdated(instance));
 | 
					                listeners.forEach(listener -> listener.onServerUpdated(instance));
 | 
				
			||||||
                break;
 | 
					                break;
 | 
				
			||||||
            case CHILD_REMOVED:
 | 
					            case CHILD_REMOVED:
 | 
				
			||||||
                listeners.stream().forEach(listener -> listener.onServerRemoved(instance));
 | 
					                listeners.forEach(listener -> listener.onServerRemoved(instance));
 | 
				
			||||||
                break;
 | 
					                break;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -135,7 +135,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    private void logCircle() {
 | 
					    private void logCircle() {
 | 
				
			||||||
        log.trace("Consistent Hash Circle Start");
 | 
					        log.trace("Consistent Hash Circle Start");
 | 
				
			||||||
        circle.entrySet().stream().forEach((e) -> log.debug("{} -> {}", e.getKey(), e.getValue().getServerAddress()));
 | 
					        circle.entrySet().forEach((e) -> log.debug("{} -> {}", e.getKey(), e.getValue().getServerAddress()));
 | 
				
			||||||
        log.trace("Consistent Hash Circle End");
 | 
					        log.trace("Consistent Hash Circle End");
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -31,7 +31,6 @@ import org.thingsboard.server.dao.component.ComponentDescriptorService;
 | 
				
			|||||||
import org.thingsboard.server.extensions.api.component.*;
 | 
					import org.thingsboard.server.extensions.api.component.*;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import javax.annotation.PostConstruct;
 | 
					import javax.annotation.PostConstruct;
 | 
				
			||||||
import java.io.IOException;
 | 
					 | 
				
			||||||
import java.lang.annotation.Annotation;
 | 
					import java.lang.annotation.Annotation;
 | 
				
			||||||
import java.util.*;
 | 
					import java.util.*;
 | 
				
			||||||
import java.util.stream.Collectors;
 | 
					import java.util.stream.Collectors;
 | 
				
			||||||
@ -72,7 +71,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void registerComponents(Collection<ComponentDescriptor> comps) {
 | 
					    private void registerComponents(Collection<ComponentDescriptor> comps) {
 | 
				
			||||||
        comps.stream().forEach(c -> components.put(c.getClazz(), c));
 | 
					        comps.forEach(c -> components.put(c.getClazz(), c));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private List<ComponentDescriptor> persist(Set<BeanDefinition> filterDefs, ComponentType type) {
 | 
					    private List<ComponentDescriptor> persist(Set<BeanDefinition> filterDefs, ComponentType type) {
 | 
				
			||||||
@ -119,7 +118,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
 | 
				
			|||||||
                                throw new RuntimeException("Plugin " + def.getBeanClassName() + "action " + actionClazz.getName() + " has wrong component type!");
 | 
					                                throw new RuntimeException("Plugin " + def.getBeanClassName() + "action " + actionClazz.getName() + " has wrong component type!");
 | 
				
			||||||
                            }
 | 
					                            }
 | 
				
			||||||
                        }
 | 
					                        }
 | 
				
			||||||
                        scannedComponent.setActions(Arrays.asList(pluginAnnotation.actions()).stream().map(action -> action.getName()).collect(Collectors.joining(",")));
 | 
					                        scannedComponent.setActions(Arrays.stream(pluginAnnotation.actions()).map(action -> action.getName()).collect(Collectors.joining(",")));
 | 
				
			||||||
                        break;
 | 
					                        break;
 | 
				
			||||||
                    default:
 | 
					                    default:
 | 
				
			||||||
                        throw new RuntimeException(type + " is not supported yet!");
 | 
					                        throw new RuntimeException(type + " is not supported yet!");
 | 
				
			||||||
 | 
				
			|||||||
@ -20,9 +20,9 @@ import org.springframework.security.core.authority.SimpleGrantedAuthority;
 | 
				
			|||||||
import org.thingsboard.server.common.data.User;
 | 
					import org.thingsboard.server.common.data.User;
 | 
				
			||||||
import org.thingsboard.server.common.data.id.UserId;
 | 
					import org.thingsboard.server.common.data.id.UserId;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import java.util.Arrays;
 | 
					 | 
				
			||||||
import java.util.Collection;
 | 
					import java.util.Collection;
 | 
				
			||||||
import java.util.stream.Collectors;
 | 
					import java.util.stream.Collectors;
 | 
				
			||||||
 | 
					import java.util.stream.Stream;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
public class SecurityUser extends User {
 | 
					public class SecurityUser extends User {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -46,7 +46,7 @@ public class SecurityUser extends User {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    public Collection<? extends GrantedAuthority> getAuthorities() {
 | 
					    public Collection<? extends GrantedAuthority> getAuthorities() {
 | 
				
			||||||
        if (authorities == null) {
 | 
					        if (authorities == null) {
 | 
				
			||||||
            authorities = Arrays.asList(SecurityUser.this.getAuthority()).stream()
 | 
					            authorities = Stream.of(SecurityUser.this.getAuthority())
 | 
				
			||||||
                    .map(authority -> new SimpleGrantedAuthority(authority.name()))
 | 
					                    .map(authority -> new SimpleGrantedAuthority(authority.name()))
 | 
				
			||||||
                    .collect(Collectors.toList());
 | 
					                    .collect(Collectors.toList());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
				
			|||||||
@ -129,8 +129,10 @@ public abstract class AbstractControllerTest {
 | 
				
			|||||||
    @Autowired
 | 
					    @Autowired
 | 
				
			||||||
    void setConverters(HttpMessageConverter<?>[] converters) {
 | 
					    void setConverters(HttpMessageConverter<?>[] converters) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        this.mappingJackson2HttpMessageConverter = Arrays.asList(converters).stream().filter(
 | 
					        this.mappingJackson2HttpMessageConverter = Arrays.stream(converters)
 | 
				
			||||||
                hmc -> hmc instanceof MappingJackson2HttpMessageConverter).findAny().get();
 | 
					                .filter(hmc -> hmc instanceof MappingJackson2HttpMessageConverter)
 | 
				
			||||||
 | 
					                .findAny()
 | 
				
			||||||
 | 
					                .get();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Assert.assertNotNull("the JSON message converter must not be null",
 | 
					        Assert.assertNotNull("the JSON message converter must not be null",
 | 
				
			||||||
                this.mappingJackson2HttpMessageConverter);
 | 
					                this.mappingJackson2HttpMessageConverter);
 | 
				
			||||||
 | 
				
			|||||||
@ -61,8 +61,10 @@ public class AbstractFeatureIntegrationTest {
 | 
				
			|||||||
    @Autowired
 | 
					    @Autowired
 | 
				
			||||||
    void setConverters(HttpMessageConverter<?>[] converters) {
 | 
					    void setConverters(HttpMessageConverter<?>[] converters) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        this.mappingJackson2HttpMessageConverter = Arrays.asList(converters).stream().filter(
 | 
					        this.mappingJackson2HttpMessageConverter = Arrays.stream(converters)
 | 
				
			||||||
                hmc -> hmc instanceof MappingJackson2HttpMessageConverter).findAny().get();
 | 
					                .filter(hmc -> hmc instanceof MappingJackson2HttpMessageConverter)
 | 
				
			||||||
 | 
					                .findAny()
 | 
				
			||||||
 | 
					                .get();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        assertNotNull("the JSON message converter must not be null",
 | 
					        assertNotNull("the JSON message converter must not be null",
 | 
				
			||||||
                this.mappingJackson2HttpMessageConverter);
 | 
					                this.mappingJackson2HttpMessageConverter);
 | 
				
			||||||
 | 
				
			|||||||
@ -140,7 +140,7 @@ public class BaseAttributesDao extends AbstractDao implements AttributesDao {
 | 
				
			|||||||
        List<Row> rows = resultSet.all();
 | 
					        List<Row> rows = resultSet.all();
 | 
				
			||||||
        List<AttributeKvEntry> entries = new ArrayList<>(rows.size());
 | 
					        List<AttributeKvEntry> entries = new ArrayList<>(rows.size());
 | 
				
			||||||
        if (!rows.isEmpty()) {
 | 
					        if (!rows.isEmpty()) {
 | 
				
			||||||
            rows.stream().forEach(row -> {
 | 
					            rows.forEach(row -> {
 | 
				
			||||||
                String key = row.getString(ModelConstants.ATTRIBUTE_KEY_COLUMN);
 | 
					                String key = row.getString(ModelConstants.ATTRIBUTE_KEY_COLUMN);
 | 
				
			||||||
                AttributeKvEntry kvEntry = convertResultToAttributesKvEntry(key, row);
 | 
					                AttributeKvEntry kvEntry = convertResultToAttributesKvEntry(key, row);
 | 
				
			||||||
                if (kvEntry != null) {
 | 
					                if (kvEntry != null) {
 | 
				
			||||||
 | 
				
			|||||||
@ -143,7 +143,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
 | 
				
			|||||||
    public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
 | 
					    public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
 | 
				
			||||||
        List<TsKvEntry> entries = new ArrayList<>(rows.size());
 | 
					        List<TsKvEntry> entries = new ArrayList<>(rows.size());
 | 
				
			||||||
        if (!rows.isEmpty()) {
 | 
					        if (!rows.isEmpty()) {
 | 
				
			||||||
            rows.stream().forEach(row -> {
 | 
					            rows.forEach(row -> {
 | 
				
			||||||
                TsKvEntry kvEntry = convertResultToTsKvEntry(row);
 | 
					                TsKvEntry kvEntry = convertResultToTsKvEntry(row);
 | 
				
			||||||
                if (kvEntry != null) {
 | 
					                if (kvEntry != null) {
 | 
				
			||||||
                    entries.add(kvEntry);
 | 
					                    entries.add(kvEntry);
 | 
				
			||||||
 | 
				
			|||||||
@ -91,7 +91,7 @@ public class DefaultWebsocketMsgHandler implements WebsocketMsgHandler {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public void clear(PluginContext ctx) {
 | 
					    public void clear(PluginContext ctx) {
 | 
				
			||||||
        wsSessionsMap.values().stream().forEach(v -> {
 | 
					        wsSessionsMap.values().forEach(v -> {
 | 
				
			||||||
            try {
 | 
					            try {
 | 
				
			||||||
                ctx.close(v.getSessionRef());
 | 
					                ctx.close(v.getSessionRef());
 | 
				
			||||||
            } catch (IOException e) {
 | 
					            } catch (IOException e) {
 | 
				
			||||||
 | 
				
			|||||||
@ -40,7 +40,9 @@ public class MethodNameFilter extends SimpleRuleLifecycleComponent implements Ru
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void init(MethodNameFilterConfiguration configuration) {
 | 
					    public void init(MethodNameFilterConfiguration configuration) {
 | 
				
			||||||
        methods = Arrays.asList(configuration.getMethodNames()).stream().map(m -> m.getName()).collect(Collectors.toSet());
 | 
					        methods = Arrays.stream(configuration.getMethodNames())
 | 
				
			||||||
 | 
					                .map(m -> m.getName())
 | 
				
			||||||
 | 
					                .collect(Collectors.toSet());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
 | 
				
			|||||||
@ -39,7 +39,7 @@ public class MsgTypeFilter extends SimpleRuleLifecycleComponent implements RuleF
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void init(MsgTypeFilterConfiguration configuration) {
 | 
					    public void init(MsgTypeFilterConfiguration configuration) {
 | 
				
			||||||
        msgTypes = Arrays.asList(configuration.getMessageTypes()).stream().map(type -> {
 | 
					        msgTypes = Arrays.stream(configuration.getMessageTypes()).map(type -> {
 | 
				
			||||||
            switch (type) {
 | 
					            switch (type) {
 | 
				
			||||||
                case "GET_ATTRIBUTES":
 | 
					                case "GET_ATTRIBUTES":
 | 
				
			||||||
                    return MsgType.GET_ATTRIBUTES_REQUEST;
 | 
					                    return MsgType.GET_ATTRIBUTES_REQUEST;
 | 
				
			||||||
 | 
				
			|||||||
@ -75,7 +75,7 @@ public class MailPlugin extends AbstractPlugin<MailPluginConfiguration> implemen
 | 
				
			|||||||
        if (configuration.getOtherProperties() != null) {
 | 
					        if (configuration.getOtherProperties() != null) {
 | 
				
			||||||
            Properties mailProperties = new Properties();
 | 
					            Properties mailProperties = new Properties();
 | 
				
			||||||
            configuration.getOtherProperties()
 | 
					            configuration.getOtherProperties()
 | 
				
			||||||
                    .stream().forEach(p -> mailProperties.put(p.getKey(), p.getValue()));
 | 
					                    .forEach(p -> mailProperties.put(p.getKey(), p.getValue()));
 | 
				
			||||||
            mail.setJavaMailProperties(mailProperties);
 | 
					            mail.setJavaMailProperties(mailProperties);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        mailSender = mail;
 | 
					        mailSender = mail;
 | 
				
			||||||
 | 
				
			|||||||
@ -68,7 +68,7 @@ public class SubscriptionManager {
 | 
				
			|||||||
        registerSubscription(sessionId, deviceId, subscription);
 | 
					        registerSubscription(sessionId, deviceId, subscription);
 | 
				
			||||||
        List<TsKvEntry> missedUpdates = new ArrayList<>();
 | 
					        List<TsKvEntry> missedUpdates = new ArrayList<>();
 | 
				
			||||||
        if (subscription.getType() == SubscriptionType.ATTRIBUTES) {
 | 
					        if (subscription.getType() == SubscriptionType.ATTRIBUTES) {
 | 
				
			||||||
            subscription.getKeyStates().entrySet().stream().forEach(e -> {
 | 
					            subscription.getKeyStates().entrySet().forEach(e -> {
 | 
				
			||||||
                        Optional<AttributeKvEntry> latestOpt = ctx.loadAttribute(deviceId, DataConstants.CLIENT_SCOPE, e.getKey());
 | 
					                        Optional<AttributeKvEntry> latestOpt = ctx.loadAttribute(deviceId, DataConstants.CLIENT_SCOPE, e.getKey());
 | 
				
			||||||
                        if (latestOpt.isPresent()) {
 | 
					                        if (latestOpt.isPresent()) {
 | 
				
			||||||
                            AttributeKvEntry latestEntry = latestOpt.get();
 | 
					                            AttributeKvEntry latestEntry = latestOpt.get();
 | 
				
			||||||
 | 
				
			|||||||
@ -97,7 +97,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
 | 
				
			|||||||
        builder.setDeviceId(cmd.getDeviceId().toString());
 | 
					        builder.setDeviceId(cmd.getDeviceId().toString());
 | 
				
			||||||
        builder.setType(cmd.getType().name());
 | 
					        builder.setType(cmd.getType().name());
 | 
				
			||||||
        builder.setAllKeys(cmd.isAllKeys());
 | 
					        builder.setAllKeys(cmd.isAllKeys());
 | 
				
			||||||
        cmd.getKeyStates().entrySet().stream().forEach(e -> builder.addKeyStates(SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
 | 
					        cmd.getKeyStates().entrySet().forEach(e -> builder.addKeyStates(SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
 | 
				
			||||||
        ctx.sendPluginRpcMsg(new RpcMsg(address, SUBSCRIPTION_CLAZZ, builder.build().toByteArray()));
 | 
					        ctx.sendPluginRpcMsg(new RpcMsg(address, SUBSCRIPTION_CLAZZ, builder.build().toByteArray()));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -144,7 +144,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
 | 
				
			|||||||
        if (update.getErrorMsg() != null) {
 | 
					        if (update.getErrorMsg() != null) {
 | 
				
			||||||
            builder.setErrorMsg(update.getErrorMsg());
 | 
					            builder.setErrorMsg(update.getErrorMsg());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        update.getData().entrySet().stream().forEach(
 | 
					        update.getData().entrySet().forEach(
 | 
				
			||||||
                e -> {
 | 
					                e -> {
 | 
				
			||||||
                    SubscriptionUpdateValueListProto.Builder dataBuilder = SubscriptionUpdateValueListProto.newBuilder();
 | 
					                    SubscriptionUpdateValueListProto.Builder dataBuilder = SubscriptionUpdateValueListProto.newBuilder();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -166,7 +166,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
 | 
				
			|||||||
            return new SubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
 | 
					            return new SubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            Map<String, List<Object>> data = new TreeMap<>();
 | 
					            Map<String, List<Object>> data = new TreeMap<>();
 | 
				
			||||||
            proto.getDataList().stream().forEach(v -> {
 | 
					            proto.getDataList().forEach(v -> {
 | 
				
			||||||
                List<Object> values = data.get(v.getKey());
 | 
					                List<Object> values = data.get(v.getKey());
 | 
				
			||||||
                if (values == null) {
 | 
					                if (values == null) {
 | 
				
			||||||
                    values = new ArrayList<>();
 | 
					                    values = new ArrayList<>();
 | 
				
			||||||
 | 
				
			|||||||
@ -109,8 +109,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
 | 
				
			|||||||
                    sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
 | 
					                    sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    Map<String, Long> subState = new HashMap<>(keys.size());
 | 
					                    Map<String, Long> subState = new HashMap<>(keys.size());
 | 
				
			||||||
                    keys.stream().forEach(key -> subState.put(key, 0L));
 | 
					                    keys.forEach(key -> subState.put(key, 0L));
 | 
				
			||||||
                    attributesData.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
 | 
					                    attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
 | 
					                    sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
 | 
				
			||||||
                } else {
 | 
					                } else {
 | 
				
			||||||
@ -119,7 +119,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
 | 
				
			|||||||
                    sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
 | 
					                    sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    Map<String, Long> subState = new HashMap<>(attributesData.size());
 | 
					                    Map<String, Long> subState = new HashMap<>(attributesData.size());
 | 
				
			||||||
                    attributesData.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
 | 
					                    attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, true, subState);
 | 
					                    sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, true, subState);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
@ -154,8 +154,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
 | 
				
			|||||||
                        sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
 | 
					                        sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        Map<String, Long> subState = new HashMap<>(keys.size());
 | 
					                        Map<String, Long> subState = new HashMap<>(keys.size());
 | 
				
			||||||
                        keys.stream().forEach(key -> subState.put(key, startTs));
 | 
					                        keys.forEach(key -> subState.put(key, startTs));
 | 
				
			||||||
                        data.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
 | 
					                        data.forEach(v -> subState.put(v.getKey(), v.getTs()));
 | 
				
			||||||
                        SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
 | 
					                        SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
 | 
				
			||||||
                        subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
 | 
					                        subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
 | 
				
			||||||
                    } else {
 | 
					                    } else {
 | 
				
			||||||
@ -168,8 +168,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
 | 
				
			|||||||
                                sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
 | 
					                                sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                Map<String, Long> subState = new HashMap<>(keys.size());
 | 
					                                Map<String, Long> subState = new HashMap<>(keys.size());
 | 
				
			||||||
                                keys.stream().forEach(key -> subState.put(key, startTs));
 | 
					                                keys.forEach(key -> subState.put(key, startTs));
 | 
				
			||||||
                                data.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
 | 
					                                data.forEach(v -> subState.put(v.getKey(), v.getTs()));
 | 
				
			||||||
                                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
 | 
					                                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
 | 
				
			||||||
                                subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
 | 
					                                subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
 | 
				
			||||||
                            }
 | 
					                            }
 | 
				
			||||||
@ -188,7 +188,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
 | 
				
			|||||||
                        public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
 | 
					                        public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
 | 
				
			||||||
                            sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
 | 
					                            sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
 | 
				
			||||||
                            Map<String, Long> subState = new HashMap<>(data.size());
 | 
					                            Map<String, Long> subState = new HashMap<>(data.size());
 | 
				
			||||||
                            data.stream().forEach(v -> subState.put(v.getKey(), v.getTs()));
 | 
					                            data.forEach(v -> subState.put(v.getKey(), v.getTs()));
 | 
				
			||||||
                            SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, true, subState);
 | 
					                            SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, true, subState);
 | 
				
			||||||
                            subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
 | 
					                            subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
 | 
				
			||||||
                        }
 | 
					                        }
 | 
				
			||||||
 | 
				
			|||||||
@ -47,7 +47,7 @@ public class KafkaPlugin extends AbstractPlugin<KafkaPluginConfiguration> {
 | 
				
			|||||||
        properties.put("buffer.memory", configuration.getBufferMemory());
 | 
					        properties.put("buffer.memory", configuration.getBufferMemory());
 | 
				
			||||||
        if (configuration.getOtherProperties() != null) {
 | 
					        if (configuration.getOtherProperties() != null) {
 | 
				
			||||||
            configuration.getOtherProperties()
 | 
					            configuration.getOtherProperties()
 | 
				
			||||||
                    .stream().forEach(p -> properties.put(p.getKey(), p.getValue()));
 | 
					                    .forEach(p -> properties.put(p.getKey(), p.getValue()));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        init();
 | 
					        init();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user