Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.7k views
in Technique[技术] by (71.8m points)

get topic from kafka message in spark

In our spark-streaming job we read messages in streaming from kafka.

For this, we use the KafkaUtils.createDirectStream API which returns JavaPairInputDStreamfrom.

The messages are read from kafka (from three topics - test1,test2,test3) in the following way:

private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
                streamingContext,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
                );

We want to handle messages from each topic in a different way, and in order to achieve this we need to know the topic name for each message.

so we do the following:

JavaDStream<String> lines = messages.map(new SplitToLinesFunction());

and this is the implementation of the SplitToLinesFunction:

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
    @Override
    public String call(Tuple2<String, String> tuple2) 
    {
        System.out.println(tuple2._1);
        return tuple2._2();
    }
}

The problem is that the tuple2._1 is null and we assumed that the tuple2._1 will contain some metadata such as the name of the topic/partition from where the message came from.

However, when we print tuple2._1, it's null.

Our question - is there a way to send the topic name in kafka so that in the spark-streaming code, the tuple2._1 will contain it (and not be null)?

Note that we also tried to get the topic names from the DStream as mentioned in the spark-streaming kafka-integration tutorial:

But it returns ALL the topics that were sent to the KafkaUtils.createDirectStream, and not the specific topic from where the messages (that belong to the current RDD) arrived from.

So it didn't help us to identify the name of the topic from where the messages in the RDD were sent from.

EDIT

in response to David's answer - I tried using the MessageAndMetadata like this:

        Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
        topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);

        class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
        {

            @Override
            public String call(MessageAndMetadata<String, String> v1)
                    throws Exception {
                // nothing is printed here
                System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
                return v1.topic();
            }

        }

        JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
        messages.foreachRDD(new VoidFunction() {

            @Override
            public void call(Object t) throws Exception {
                JavaRDD<String> rdd = (JavaRDD<String>)t;
                OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                // here all the topics kafka listens to are printed, but that doesn't help
                for (OffsetRange offset : offsets) {
                    System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
                }
            }
        });

The problem is that nothing was printed in the MessageAndMetadataFunction.call method. what should I fix in order to get the relevant topic for that RDD inside the MessageAndMetadataFunction.call method?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Use one of the versions of createDirectStream that takes a messageHandler function as a parameter. Here's what I do:

val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
  ssc,
  kafkaParams,
  getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap,
  (msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
)

There's stuff there that doesn't mean anything to you -- the relevant part is

(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}

If you are not familiar with Scala, all the function does is return a Tuple2 containing msg.topic and msg.message. Your function needs to return both of these in order for you to use them downstream. You could just return the entire MessageAndMetadata object instead, which gives you a couple of other interesting fields. But if you only wanted the topic and the message, then use the above.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

2.1m questions

2.1m answers

60 comments

56.5k users

...