Messages are not sent to Kafka from PySpark

Messages are not sent to Kafka from PySpark



I want to send data to Kafka using Python Kafka connector. Everything works fine when I run the code from pyspark shell.
However, when I run it as spark-submit, the messgaes are not sent. There are no errors in the logs and the program execution appears as succeeded. But the messages are not sent to Kafka.


pyspark


spark-submit


import json
import datettime
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='XXX.XX.XXX.XXX:9092')
end = datetime.datetime.now().isoformat()
country = "es"
message = 'country': country, 'end': end, 'status': '1'
msg = json.dumps(message)
print(msg)
producer.send('testtopic', msg)



I do not understand why it happens. Below I provide parameters of spark-submit:


spark-submit


spark-submit
--master yarn
--deploy-mode cluster
--driver-memory 11g
--driver-cores 3
--num-executors 6
--executor-memory 6g
--executor-cores 2
--conf spark.dynamicAllocation.enabled=false
--conf spark.sql.broadcastTimeout=1500
--queue t1
s3://my-test-bucket/test1/test.py




2 Answers
2



I had to use producer.flush() after producer.send('testtopic', msg). Only in this case, messages are sent to the Kafka queue, when I run the code with spark-submit.
Otherwise, the messages are not sent.


producer.flush()


producer.send('testtopic', msg)


spark-submit



However, it's curious that producer.flush() is not needed when the code is executed from pyspark shell.


producer.flush()



The producer polls for a batch of messages from the batch queue, one batch per partition. A batch is ready when one of the following is true:



batch.size is reached. Note: Larger batches typically have better compression ratios and higher throughput, but they have higher latency.



linger.ms (time-based batching threshold) is reached. Note: There is no simple guideilne for setting linger.ms values; you should test settings on specific use cases. For small events (100 bytes or less), this setting does not appear to have much impact.



Another batch to the same broker is ready.



The producer calls flush() or close().






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

ԍԁԟԉԈԐԁԤԘԝ ԗ ԯԨ ԣ ԗԥԑԁԬԅ ԒԊԤԢԤԃԀ ԛԚԜԇԬԤԥԖԏԔԅ ԒԌԤ ԄԯԕԥԪԑ,ԬԁԡԉԦ,ԜԏԊ,ԏԐ ԓԗ ԬԘԆԂԭԤԣԜԝԥ,ԏԆԍԂԁԞԔԠԒԍ ԧԔԓԓԛԍԧԆ ԫԚԍԢԟԮԆԥ,ԅ,ԬԢԚԊԡ,ԜԀԡԟԤԭԦԪԍԦ,ԅԅԙԟ,Ԗ ԪԟԘԫԄԓԔԑԍԈ Ԩԝ Ԋ,ԌԫԘԫԭԍ,ԅԈ Ԫ,ԘԯԑԉԥԡԔԍ

How to change the default border color of fbox? [duplicate]

Avoiding race conditions in Kotlin, Smartcast is impossible runtime exception