
- 背景
- 复现
- 代码
- 内存信息
- 总结
- 解决方案
- pipe详解
- 介绍:
- pipe 容量
Fate On Spark在测试大文件上传的时候,遇到当文件大小超过fate所在机器内存大小的时候,上传任务最终状态为canceled,经过多次测试发现整个文件其实是已经完整上传到HDFS上,但是Fate在文件上传完毕之后会调用count()函数,当fate底层storage为hdfs时:count()执行逻辑其实就是将刚刚上传到hdfs的数据重新read一遍,读一行就做一下计数。问题就在于fate读取hdfs上数据使用的方法上面:python的第三方库pyarrow,
复现 代码机器信息: 32核128G
hadoop文件 323.03G
import io
from pyarrow import fs
# path = "hdfs://fate-cluster//fate/input_data/experiment/student_homo_host"
path = "hdfs://fate-cluster//fate/input_data/experiment/idCard_1000_tag_10000w_*_20w_guest"
try:
from pyarrow import HadoopFileSystem
HadoopFileSystem(path)
except Exception as e:
print("error")
_hdfs_client = fs.HadoopFileSystem.from_uri(path)
def _as_generator():
with io.TextIOWrapper(buffer=_hdfs_client.open_input_stream, encoding="utf-8", newline=hdfs_utils.newline,
line_buffering=True, write_through=True) as reader:
print("-----准备流结束------")
for line in reader:
yield line
def count():
count = 0
for _ in _as_generator():
count += 1
return count
if __name__ == '__main__':
print(count())
内存信息
总结
直接用python直接调用时会出现Killed的系统异常信息(该信息无法在python层面被捕获)。运行在spark环境中时不会有系统的kill信息显示,生产环境中出现问题后并不好排查。目前推测HadoopFileSystem.open_input_stream()打开的文件流是整个文件大小。
解决方案目前使用hadoop命令来替换之前的流处理,通过子进程执行hadoop fs -cat ${path}返回的标准输出来逐行读取内容。
with subprocess.Popen(["hadoop", "fs", "-cat", self._path], universal_newlines=True, stdout=subprocess.PIPE) as pipe:
for _ in pipe.stdout:
yield _
subprocess.Popen 是异步、subprocess.run是同步,
pipe详解linux pipe详解
介绍:Pipe 就是一个管道,一端可以将数据写入到pi pe,一端可以将数据从管道中取出
读写情况分析pipe IO and FIFOS
- 消费者读取一个空的pi pe,这个消费者会一直阻塞,知道pi pe中数据可用
- 生产者向一个满的pi pe中写数据,这个生产者也会一直阻塞,直到消费者读取了数据并足够消费者写入(非阻塞IO也可以设置O_NONBLOCK)
- 所有指向pi pe的生产者都已经关闭,访问此pi pe的消费者会会读取完毕整个文件,没有读取到则return 0
- 所有指向pi pe的消费者都已经关闭,访问此pi pe的生产者将生成一个SIGPIPE信号,如果忽略这个信号,则会报错,错误为EPIPE
- Linux 2.6.11之前pi pe大小和页大小一致4KB
- Linux 2.6.11之后pi pe容量为16个页大小为64KB
- Linux2.6.35之后可以查看和设置F_GETPIPE_SZ and F_SETPIPE_SZ
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)