Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ gradle/libs.versions.toml @kafbat/backend
/settings.gradle @kafbat/backend
/gradle/ @kafbat/backend
/contract/ @kafbat/backend
/contract-typespec/ @kafbat/backend
/api/ @kafbat/backend
/serde-api/ @kafbat/backend
/documentation/compose/ @kafbat/backend
Expand Down
19 changes: 16 additions & 3 deletions api/src/main/java/io/kafbat/ui/model/InternalClusterState.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.kafbat.ui.model;

import com.google.common.base.Throwables;
import io.kafbat.ui.api.model.ControllerType;
import java.math.BigDecimal;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Data;
import org.apache.kafka.common.Node;
import org.jetbrains.annotations.Nullable;

@Data
public class InternalClusterState {
Expand All @@ -27,6 +29,7 @@ public class InternalClusterState {
private BigDecimal bytesInPerSec;
private BigDecimal bytesOutPerSec;
private Boolean readOnly;
private ControllerType controller;

public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
name = cluster.getName();
Expand All @@ -38,9 +41,7 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
.orElse(null);
topicCount = (int) statistics.topicDescriptions().count();
brokerCount = statistics.getClusterDescription().getNodes().size();
activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
.map(Node::id)
.orElse(null);
activeControllers = getActiveControllers(statistics);
version = statistics.getVersion();

diskUsage = statistics.getClusterState().getNodesStates().values().stream()
Expand Down Expand Up @@ -78,6 +79,18 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
outOfSyncReplicasCount = partitionsStats.getOutOfSyncReplicasCount();
underReplicatedPartitionCount = partitionsStats.getUnderReplicatedPartitionCount();
readOnly = cluster.isReadOnly();
controller = statistics.getController();
}

@Nullable
private static Integer getActiveControllers(Statistics statistics) {
if (ControllerType.KRAFT == statistics.getController()) {
return statistics.getQuorumLeaderId();
}

return Optional.ofNullable(statistics.getClusterDescription().getController())
.map(Node::id)
.orElse(null);
}

}
3 changes: 3 additions & 0 deletions api/src/main/java/io/kafbat/ui/model/Statistics.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kafbat.ui.model;

import io.kafbat.ui.api.model.ControllerType;
import io.kafbat.ui.service.ReactiveAdminClient;
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
Expand All @@ -22,6 +23,8 @@ public class Statistics implements AutoCloseable {
Metrics metrics;
ScrapedClusterState clusterState;
Map<String, KafkaConnectState> connectStates;
ControllerType controller;
Integer quorumLeaderId;

public static Statistics empty() {
return builder()
Expand Down
57 changes: 35 additions & 22 deletions api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand Down Expand Up @@ -153,7 +154,7 @@ private record ConfigRelatedInfo(String version,
private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
return ReactiveAdminClient.describeClusterImpl(ac, Set.of())
.flatMap(desc -> {
// choosing node from which we will get configs (starting with controller)
// choosing the node which we will get configs from (starting with the controller)
var targetNodeId = Optional.ofNullable(desc.controller)
.map(Node::id)
.orElse(desc.getNodes().iterator().next().id());
Expand Down Expand Up @@ -183,7 +184,11 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
final String finalVersion = version.orElse(DEFAULT_UNKNOWN_VERSION);
final boolean finalTopicDeletionEnabled = topicDeletionEnabled;
return SupportedFeature.forVersion(ac, version)
.map(features -> new ConfigRelatedInfo(finalVersion, features, finalTopicDeletionEnabled));
.map(features -> new ConfigRelatedInfo(
finalVersion,
features,
finalTopicDeletionEnabled
));
});
})
.cache(UPDATE_DURATION);
Expand Down Expand Up @@ -211,19 +216,20 @@ private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullab
// NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
// (see MonoSink.success(..) javadoc for details)
public static <T> Mono<T> toMono(KafkaFuture<T> future) {
return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
if (ex != null) {
// KafkaFuture doc is unclear about what exception wrapper will be used
// (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
if (ex instanceof CompletionException || ex instanceof ExecutionException) {
sink.error(ex.getCause()); //unwrapping exception
} else {
sink.error(ex);
}
} else {
sink.success(res);
}
})).doOnCancel(() -> future.cancel(true))
return Mono.<T>create(sink ->
future.whenComplete((res, ex) -> {
if (ex != null) {
// KafkaFuture doc is unclear about what exception wrapper will be used
// (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
if (ex instanceof CompletionException || ex instanceof ExecutionException) {
sink.error(ex.getCause()); //unwrapping exception
} else {
sink.error(ex);
}
} else {
sink.success(res);
}
})).doOnCancel(() -> future.cancel(true))
// AdminClient is using single thread for kafka communication
// and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
// If some of downstream operation are blocking (by mistake) this can lead to
Expand Down Expand Up @@ -427,20 +433,23 @@ public Mono<ClusterDescription> describeCluster() {
return describeClusterImpl(client, getClusterFeatures());
}

private static Mono<ClusterDescription> describeClusterImpl(AdminClient client, Set<SupportedFeature> features) {
private static Mono<ClusterDescription> describeClusterImpl(AdminClient client,
Set<SupportedFeature> features
) {
boolean includeAuthorizedOperations =
features.contains(SupportedFeature.DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS);

DescribeClusterResult result = client.describeCluster(
new DescribeClusterOptions().includeAuthorizedOperations(includeAuthorizedOperations));
var allOfFuture = KafkaFuture.allOf(
result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
return toMono(allOfFuture).then(
Mono.fromCallable(() ->
new ClusterDescription(
result.controller().get(),
result.clusterId().get(),
result.nodes().get(),
result.authorizedOperations().get()
result.controller().get(),
result.clusterId().get(),
result.nodes().get(),
result.authorizedOperations().get()
)
)
);
Expand Down Expand Up @@ -616,8 +625,8 @@ private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collect

@VisibleForTesting
static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
Predicate<TopicPartition> partitionPredicate,
boolean failOnUnknownLeader) {
Predicate<TopicPartition> partitionPredicate,
boolean failOnUnknownLeader) {
var goodPartitions = new HashSet<TopicPartition>();
for (TopicDescription description : topicDescriptions) {
var goodTopicPartitions = new ArrayList<TopicPartition>();
Expand Down Expand Up @@ -715,6 +724,10 @@ public Mono<Void> alterClientQuota(ClientQuotaAlteration alteration) {
return toMono(client.alterClientQuotas(List.of(alteration)).all());
}

public Mono<QuorumInfo> describeMetadataQuorum() {
return toMono(client.describeMetadataQuorum().quorumInfo());
}


// returns tp -> list of active producer's states (if any)
public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(String topic) {
Expand Down
41 changes: 35 additions & 6 deletions api/src/main/java/io/kafbat/ui/service/StatisticsService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kafbat.ui.service;

import static io.kafbat.ui.api.model.ControllerType.KRAFT;
import static io.kafbat.ui.api.model.ControllerType.ZOOKEEPER;
import static io.kafbat.ui.service.ReactiveAdminClient.ClusterDescription;

import io.kafbat.ui.config.ClustersProperties;
Expand All @@ -11,9 +13,13 @@
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

Expand All @@ -32,7 +38,6 @@ public Mono<Statistics> updateCache(KafkaCluster c) {
return getStatistics(c).doOnSuccess(m -> cache.replace(c, m));
}

@SuppressWarnings("unchecked")
private Mono<Statistics> getStatistics(KafkaCluster cluster) {
return adminClientService.get(cluster).flatMap(ac ->
ac.describeCluster()
Expand All @@ -42,10 +47,17 @@ private Mono<Statistics> getStatistics(KafkaCluster cluster) {
Mono.zip(
featureService.getAvailableFeatures(ac, cluster, description),
loadClusterState(description, ac),
loadKafkaConnects(cluster)
loadKafkaConnects(cluster),
loadQuorumInfo(ac)
).flatMap(t ->
scrapeMetrics(cluster, t.getT2(), description)
.map(metrics -> createStats(description, t.getT1(), t.getT2(), t.getT3(), metrics, ac))
.map(metrics -> createStats(description,
t.getT1(),
t.getT2(),
t.getT3(),
metrics,
t.getT4(),
ac))
)
)
).doOnError(e ->
Expand All @@ -54,12 +66,25 @@ private Mono<Statistics> getStatistics(KafkaCluster cluster) {
.onErrorResume(t -> Mono.just(Statistics.statsUpdateError(t))));
}

@NotNull
private static Mono<Optional<QuorumInfo>> loadQuorumInfo(ReactiveAdminClient ac) {
return ac.describeMetadataQuorum()
.map(Optional::of)
.onErrorResume(t ->
t instanceof UnsupportedVersionException
? Mono.just(Optional.empty())
: Mono.error(t)
);
}

private Statistics createStats(ClusterDescription description,
List<ClusterFeature> features,
ScrapedClusterState scrapedClusterState,
List<KafkaConnectState> connects, Metrics metrics,
List<KafkaConnectState> connects,
Metrics metrics,
Optional<QuorumInfo> quorumInfo,
ReactiveAdminClient ac) {
return Statistics.builder()
var stats = Statistics.builder()
.status(ServerStatusDTO.ONLINE)
.clusterDescription(description)
.version(ac.getVersion())
Expand All @@ -71,7 +96,11 @@ private Statistics createStats(ClusterDescription description,
Collectors.toMap(KafkaConnectState::getName, c -> c)
)
)
.build();
.controller(quorumInfo.isPresent() ? KRAFT : ZOOKEEPER);

quorumInfo.ifPresent(i -> stats.quorumLeaderId(i.leaderId()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would save quoruminfo into stats, just in sake of future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was my initial idea too, but I decided not to, as QuorumInfo.nodes is an empty collection for some reason, voters was non-empty but quite boring from a data perspective, the rest is just not useful in any way in current implementation. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still keep it there


return stats.build();
}

private Mono<ScrapedClusterState> loadClusterState(ClusterDescription clusterDescription,
Expand Down
6 changes: 6 additions & 0 deletions contract-typespec/api/clusters.tsp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ model Cluster {
readOnly?: boolean;
version?: string;
features?: ClusterFeature[];
controller?: ControllerType;
}

alias ClusterFeature =
Expand Down Expand Up @@ -98,3 +99,8 @@ model BrokerDiskUsage {
segmentSize?: int64;
segmentCount?: int32;
}

enum ControllerType {
ZOOKEEPER,
KRAFT
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import useAppParams from 'lib/hooks/useAppParams';
import Table from 'components/common/NewTable';
import { clusterBrokerPath } from 'lib/paths';
import { useBrokers } from 'lib/hooks/api/brokers';
import { useClusterStats } from 'lib/hooks/api/clusters';
import { useClusters, useClusterStats } from 'lib/hooks/api/clusters';
import ResourcePageHeading from 'components/common/ResourcePageHeading/ResourcePageHeading';

import { BrokersMetrics } from './BrokersMetrics/BrokersMetrics';
Expand All @@ -14,6 +14,8 @@ import { getBrokersTableColumns, getBrokersTableRows } from './lib';
const BrokersList: React.FC = () => {
const navigate = useNavigate();
const { clusterName } = useAppParams<{ clusterName: ClusterName }>();
const { data: clusterData } = useClusters();
const cluster = clusterData?.find(({ name }) => name === clusterName);
const { data: clusterStats = {} } = useClusterStats(clusterName);
const { data: brokers } = useBrokers(clusterName);

Expand Down Expand Up @@ -56,6 +58,7 @@ const BrokersList: React.FC = () => {
offlinePartitionCount={offlinePartitionCount}
onlinePartitionCount={onlinePartitionCount}
underReplicatedPartitionCount={underReplicatedPartitionCount}
controller={cluster?.controller}
/>

<Table
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import React from 'react';
import * as Metrics from 'components/common/Metrics';
import { ControllerType } from 'generated-sources';

import * as S from './BrokersMetrics.styled';

Expand All @@ -12,6 +13,7 @@ type BrokersMetricsProps = {
onlinePartitionCount: number | undefined;
underReplicatedPartitionCount: number | undefined;
version: string | undefined;
controller: ControllerType | undefined;
};

export const BrokersMetrics = ({
Expand All @@ -23,6 +25,7 @@ export const BrokersMetrics = ({
offlinePartitionCount,
underReplicatedPartitionCount,
onlinePartitionCount,
controller,
}: BrokersMetricsProps) => {
const replicas = (inSyncReplicasCount ?? 0) + (outOfSyncReplicasCount ?? 0);
const areAllInSync = inSyncReplicasCount && replicas === inSyncReplicasCount;
Expand Down Expand Up @@ -99,6 +102,10 @@ export const BrokersMetrics = ({
<Metrics.Indicator label="Out Of Sync Replicas">
{outOfSyncReplicasCount}
</Metrics.Indicator>

<Metrics.Indicator label="Controller Type">
{controller === ControllerType.KRAFT ? 'KRaft' : 'ZooKeeper'}
</Metrics.Indicator>
</Metrics.Section>
</Metrics.Wrapper>
);
Expand Down
Loading
Loading