PySpark Parse Kafka csv delimited data to columns using from_csv

I am new to Structured Streaming with Kafka. Trying to convert the , delimited data from Kafka to Dataframe in PySpark using schema and from_csv.

kafkaDataSchema = StructType([
  StructField("sid", StringType()), StructField("timestamp", LongType()),
  StructField("sensor", StringType()), StructField("value", StringType()),
])
kafkaStream = spark.readStream 
            .format("kafka") 
            .option("kafka.bootstrap.servers", self.config.get('kafka-config', 'bootstrap-servers')) 
            .option("subscribe", self.config.get('kafka-config', 'topic-list-input')) 
            .option("startingOffsets", self.config.get('kafka-config', 'startingOffsets')) 
            .load()
            .selectExpr("CAST(value AS STRING)")
formattedStream = kafkaStream.select(from_csv(kafkaStream.value, kafkaDataSchema))

I am getting the below error:

Traceback (most recent call last):
  File "main.py", line 43, in <module>
    formattedStream = KafkaSource.readData(spark)
  File "src.zip/src/main/sources/KafkaSource.py", line 31, in readData
  File "src.zip/src/main/sources/KafkaSource.py", line 36, in formatKafkaData
  File "/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/functions.py", line 4082, in from_csv
TypeError: schema argument should be a column or string

How can I solve the issue?

Source: Windows Questions

LEAVE A COMMENT