current position:Home>In depth understanding of Kafka principle: implementing Kafka in Java

In depth understanding of Kafka principle: implementing Kafka in Java

2022-01-27 03:22:52 zhengzaifeidelushang

One 、pom.xml introduce kafka rely on

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.7.2</version>
		</dependency>

Two 、kafka Consumer program

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyConsumer {
    
    private final static String TOPIC_NAME = "optics-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
        Properties props = new Properties();
        // Set up kafka Cluster address 
        props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
                "required username=\"debezium\" password=\"NGFlM2I1NTJlNmFk\";");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","PLAIN");
        // Consumer group 
        props.put("group.id", "opticsgroup");
        // Deserialization 
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // Create consumer 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
        // Subscribe to topics 
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        try {
    
            while (true) {
     
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for(ConsumerRecord<String,String> record : records){
    
                    System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }
        } finally {
    
            consumer.close(); 
        }
    }
}

3、 ... and 、kafka Producer procedure

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducer {
    
    private final static String TOPIC_NAME = "optics-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
        Properties props = new Properties();
        // Set up kafka Cluster address 
        props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
                "required username=\"debezium\" password=\"NGFlM2I1NTJlNmFk\";");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","PLAIN");
        //ack Pattern ,all Is the slowest but safest 
        props.put("acks", "-1");
        props.put("retries", 3);
        // Total byte size of unsent messages per partition ( Company : byte ), If it exceeds the set value, it will submit data to the server 
        props.put("batch.size", 16384);
        // The maximum number of messages that the producer client can send , The default value is 1048576B,1MB
        //props.put("max.request.size",10);
        // How long does the message stay in the buffer , If the value exceeds the set value, it will be submitted to the server 
        props.put("linger.ms", 10);
        // Whole Producer Total memory used , If the buffer is full, data will be submitted to the server 
        //buffer.memory Be greater than batch.size, Otherwise, an error of insufficient memory will be reported 
        props.put("buffer.memory", 33554432);
        // Serializer 
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        //key: The function is to decide which partition to send ,value: Specific message content to be sent 
        for (int i = 0; i < 10; i++) {
    
            RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, Integer.toString(i), "dd:" + i)).get();
            System.out.println(" Send message results in synchronous mode :" + "topic name :" + metadata.topic() + " | partition Partition :" + metadata.partition() + " | offset Offset :" + metadata.offset());
        }
    }
}

Four 、 First run kafka Producer procedure , Check it again kafka Consumer program

Run the producer program first, and the output is as follows :

 Insert picture description here
Consumer output is shown below :

 Insert picture description here

copyright notice
author[zhengzaifeidelushang],Please bring the original link to reprint, thank you.
https://en.cdmana.com/2022/01/202201270322511058.html

Random recommended