fixed sparkplug tests
This commit is contained in:
		
							parent
							
								
									9515bdea1d
								
							
						
					
					
						commit
						7945669303
					
				@ -63,7 +63,9 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
 | 
			
		||||
                    return finalFuture.get().get().isPresent();
 | 
			
		||||
                });
 | 
			
		||||
        TsKvEntry actualTsKvEntry = finalFuture.get().get().get();
 | 
			
		||||
        Assert.assertEquals(expectedTsKvEntry, actualTsKvEntry);
 | 
			
		||||
        Assert.assertEquals(expectedTsKvEntry.getKey(), actualTsKvEntry.getKey());
 | 
			
		||||
        Assert.assertEquals(expectedTsKvEntry.getValue(), actualTsKvEntry.getValue());
 | 
			
		||||
        Assert.assertEquals(expectedTsKvEntry.getTs(), actualTsKvEntry.getTs());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void processClientWithCorrectNodeAccessTokenWithoutNDEATH_Test() throws Exception {
 | 
			
		||||
@ -95,20 +97,27 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
 | 
			
		||||
        List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
 | 
			
		||||
 | 
			
		||||
        TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), ONLINE.name()));
 | 
			
		||||
        AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
 | 
			
		||||
        await(alias + messageName(STATE) + ", device: " + savedGateway.getName())
 | 
			
		||||
                .atMost(40, TimeUnit.SECONDS)
 | 
			
		||||
                .until(() -> {
 | 
			
		||||
                    finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId()));
 | 
			
		||||
                    return finalFuture.get().get().contains(tsKvEntry);
 | 
			
		||||
                    var foundEntry = tsService.findAllLatest(tenantId, savedGateway.getId()).get().stream()
 | 
			
		||||
                            .filter(tsKv -> tsKv.getKey().equals(tsKvEntry.getKey()))
 | 
			
		||||
                            .filter(tsKv -> tsKv.getValue().equals(tsKvEntry.getValue()))
 | 
			
		||||
                            .filter(tsKv -> tsKv.getTs() == tsKvEntry.getTs())
 | 
			
		||||
                            .findFirst();
 | 
			
		||||
                    return foundEntry.isPresent();
 | 
			
		||||
                });
 | 
			
		||||
 | 
			
		||||
        for (Device device : devices) {
 | 
			
		||||
            await(alias + messageName(STATE) + ", device: " + device.getName())
 | 
			
		||||
                    .atMost(40, TimeUnit.SECONDS)
 | 
			
		||||
                    .until(() -> {
 | 
			
		||||
                        finalFuture.set(tsService.findAllLatest(tenantId, device.getId()));
 | 
			
		||||
                        return finalFuture.get().get().contains(tsKvEntry);
 | 
			
		||||
                        var foundEntry = tsService.findAllLatest(tenantId, device.getId()).get().stream()
 | 
			
		||||
                                .filter(tsKv -> tsKv.getKey().equals(tsKvEntry.getKey()))
 | 
			
		||||
                                .filter(tsKv -> tsKv.getValue().equals(tsKvEntry.getValue()))
 | 
			
		||||
                                .filter(tsKv -> tsKv.getTs() == tsKvEntry.getTs())
 | 
			
		||||
                                .findFirst();
 | 
			
		||||
                        return foundEntry.isPresent();
 | 
			
		||||
                    });
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -78,7 +78,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
 | 
			
		||||
                    finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId()));
 | 
			
		||||
                    return finalFuture.get().get().size() == (listTsKvEntry.size() + 1);
 | 
			
		||||
                });
 | 
			
		||||
        Assert.assertTrue("Actual tsKvEntrys is not containsAll Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry));
 | 
			
		||||
        Assert.assertTrue("Actual tsKvEntries is not containsAll Expected tsKvEntries", containsIgnoreVersion(finalFuture.get().get(), listTsKvEntry));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void processClientWithCorrectAccessTokenPushNodeMetricBuildArraysPrimitiveSimple() throws Exception {
 | 
			
		||||
@ -107,7 +107,20 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
 | 
			
		||||
                    finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId()));
 | 
			
		||||
                    return finalFuture.get().get().size() == (listTsKvEntry.size() + 1);
 | 
			
		||||
                });
 | 
			
		||||
        Assert.assertTrue("Actual tsKvEntrys is not containsAll Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry));
 | 
			
		||||
        Assert.assertTrue("Actual tsKvEntries is not containsAll Expected tsKvEntries", containsIgnoreVersion(finalFuture.get().get(), listTsKvEntry));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static boolean containsIgnoreVersion(List<TsKvEntry> expected, List<TsKvEntry> actual) {
 | 
			
		||||
        for (TsKvEntry actualEntry : actual) {
 | 
			
		||||
            var found = expected.stream()
 | 
			
		||||
                    .filter(tsKv -> tsKv.getKey().equals(actualEntry.getKey()))
 | 
			
		||||
                    .filter(tsKv -> tsKv.getValue().equals(actualEntry.getValue()))
 | 
			
		||||
                    .filter(tsKv -> tsKv.getTs() == actualEntry.getTs())
 | 
			
		||||
                    .findFirst();
 | 
			
		||||
            if (found.isEmpty()) {
 | 
			
		||||
                return false;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user