
查询时返回的StreamingQuery() 对象可以对查询进行监控,对象包括recentProgress,lastProgress,status等多个属性。
代码举例
#!/usr/bin/env python3
from pprint import pprint
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
if __name__ == "__main__":
spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
wordCounts = words.groupBy("word").count()
query = wordCounts
.writeStream
.outputMode("complete")
.format("console")
.queryName('write_to_console')
.trigger(processingTime="8 seconds")
.start()
while True:
if query.lastProgress:
if query.lastProgress['numInputRows'] > 0:
pprint(query.lastProgress)
pprint(query.status)
time.sleep(5)
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)