water8394 / flink-recommandsystem-demo Goto Github PK
View Code? Open in Web Editor NEW:helicopter::rocket:基于Flink实现的商品实时推荐系统。flink统计商品热度,放入redis缓存,分析日志信息,将画像标签和实时记录放入Hbase。在用户发起推荐请求后,根据用户画像重排序热度榜,并结合协同过滤和标签两个推荐模块为新生成的榜单的每一个产品添加关联产品,最后返回新的用户列表。
:helicopter::rocket:基于Flink实现的商品实时推荐系统。flink统计商品热度,放入redis缓存,分析日志信息,将画像标签和实时记录放入Hbase。在用户发起推荐请求后,根据用户画像重排序热度榜,并结合协同过滤和标签两个推荐模块为新生成的榜单的每一个产品添加关联产品,最后返回新的用户列表。
你好,我想问下我打包那一步出现错误了,没法打包,这个怎么解决啊
6个定时任务都是针对“con”主题进行消费,找遍了工程所有地方也没有找到生产"con"主题消息的地方,唯一的初始化kafka数据的地方就是generator.sh,发现你写的生产msg到"log"主题,请问是否是书写错误,正确的主题应该是"con"
contact
记录是从3 开始的, 而product
记录是从1开始的。
-- ----------------------------
-- Records of contact
-- ----------------------------
INSERT INTO `contact` VALUES (3, 'http://img01.02d.com/Public/Upload/image/20190713/5d29b8512a04f.jpg', 'BarieCat“柚屿”系列', '舒适的非离子材质融合充满复古韵味的混血花纹;虚化的深色边缘与瞳孔的轮廓完美融合;搭配低明度高显色的基色将酷感混血进行到底。', 134.00, 'Bariecat');
INSERT INTO `contact` VALUES (4, 'http://img01.02d.com/Public/Upload/image/20190713/5d297c0fa4f48.jpg', '溪悦creekeye呦呦灰_副本', '进口MPC高保湿型非离子,轻薄无感!', 89.00, '溪悦Creek eye');
这样的话, 当开始推荐的时候, 此方程会throw index exception
private List<ProductDto> fillProductDto(List<String> list, List<ContactEntity> contactEntities,
List<ProductEntity> productEntities, int topSize) {
List<ProductDto> ret = new ArrayList<>();
for (int i = 0; i < topSize; i++) {
String topId = list.get(i);
ProductDto dto = new ProductDto();
dto.setScore(TOP_SIZE + 1 - i);
for (int j = 0; j < topSize; j++) {
if (topId.equals(String.valueOf(contactEntities.get(j).getId()))) {
dto.setContact(contactEntities.get(j));
}
if (topId.equals(String.valueOf(productEntities.get(j).getProductId()))) {
dto.setProduct(productEntities.get(j));
}
}
ret.add(dto);
}
return ret;
}
你好~ 你的项目非常棒,最近照着你的文档基本搭好了环境~不过在看代码时有个疑问。
flink-2-hbase里面有6个task,分别con、history这两个topic里面取数据,但是在web项目里面,我只看到了在点击收藏、加车等按钮时,有往con这个topic发数据,搜遍整个工程,并没有发现往history这个topic发数据的代码。
## UserHistoryTask.java
DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("history", new SimpleStringSchema(), properties));
请问下history这个topic的数据是从哪里来的?
base那个项目里面包含6个数据清洗任务,还有一个协同过滤的定时任务这些都要分别打包在flink上面运行吗,还是分别打包到docker运行
docker-compose.yml里的hbase里,镜像拉取的时候,出现ERROR: unauthorized: authentication required错误,怎么处理呢
你的推荐结果都是离线跑出来的结果 ,如果是实时推荐 那么协同过滤是不是应该在flink端实时做处理呢
public String map(LogEntity logEntity) throws Exception {
Action actionLastTime = state.value();
Action actionThisTime = new Action(logEntity.getAction(), logEntity.getTime().toString());
int times = 1;
// 如果用户没有操作 则为state创建值
if (actionLastTime == null) {
state.update(actionThisTime);
//actionLastTime = actionThisTime
saveToHBase(logEntity, 1);
}else{
times = getTimesByRule(actionLastTime, actionThisTime);
saveToHBase(logEntity, times);
}
//saveToHBase(logEntity, times);
if (actionThisTime.getType().equals("3")){
state.clear();
}
return null;
}
在前台点击查看收藏时,后台会出现Hbase中没有产品记录的warn
UserHistoryWithInterestMapFunction 类里面就没有看到state.update操作,楼主跑起来真的没问题吗
您好我有些问题想请教您一下
您好,
非常感谢项目的分享。 有几个问题想要咨询一下, 在stackover flow上找了许久也没有找到解决方案。
flink-2-hbase/src/main/resources/config.properties里有一个ip地址192.168.1.7
kafka.bootstrap.servers=192.168.1.7:9092
kafka.zookeeper.connect=192.168.1.7:2181
但是在docker-compose.yml里kafka的outside接口是192.168.56.103
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://192.168.56.103:9092
当我尝试在一台机器部署所有平台时, 我将kafka outside改成了localhost:9092
kafka:
container_name: kafka
image: wurstmeister/kafka:2.12-2.2.1
ports:
- "9092:9092"
depends_on:
- zookeeper
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_CREATE_TOPICS: "flink:1:1"
然后新建了一个flink2kafka的消费java程序:
public class KalfkaApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
System.out.println(properties);
DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("log", new SimpleStringSchema(), properties));
// dataStream.map(new LogMapFunction());
dataStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink1 says: " + value;
}
}).print();
env.execute("stout");
}
}
这个java程序在IDE里面可以直接运行,运行效果如下:
4> Kafka and Flink1 says: 315,315,1583225504,3
9> Kafka and Flink1 says: 302,302,1583225491,2
10> Kafka and Flink1 says: 304,304,1583225493,1
1> Kafka and Flink1 says: 309,309,1583225498,3
7> Kafka and Flink1 says: 298,298,1583225487,1
12> Kafka and Flink1 says: 308,308,1583225497,2
2> Kafka and Flink1 says: 311,311,1583225500,2
3> Kafka and Flink1 says: 313,313,1583225502,1
6> Kafka and Flink1 says: 319,319,1583225508,1
7> Kafka and Flink1 says: 321,321,1583225510,3
10> Kafka and Flink1 says: 326,326,1583225515,2
9> Kafka and Flink1 says: 324,324,1583225513,3
11> Kafka and Flink1 says: 328,328,1583225517,1
12> Kafka and Flink1 says: 330,330,1583225519,3
Process finished with exit code -1
但是当我打包成jar通过您的教程docker cp进flink后, 报以下错误:
root@db0dba1666b2:/opt/flink# flink run /opt/flink-2-hbase-1.0-SNAPSHOT.jar
{bootstrap.servers=localhost:9092, group.id=test, zookeeper.connect=localhost:2181}
Job has been submitted with JobID 144b101ff9964cf7ba109cfd95e7f005
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 144b101ff9964cf7ba109cfd95e7f005)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 144b101ff9964cf7ba109cfd95e7f005)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at com.demo.task.KalfkaApp.main(KalfkaApp.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 144b101ff9964cf7ba109cfd95e7f005)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
网上查了一下这个错误, 我感觉是不是flink没法连到zookeeper所以取不到元数据?网上说KAFKA_LISTENER_SECURITY_PROTOCOL_MAP不能是SSL,但是因为楼主设置本身就是plaintext所以再次陷入僵局。。。
谢谢!
1小时日志量接入统计,代码里写的是一个固定值69。而代码中真实的是获取的redis中的meter的值。在web和后台处理服务中都找不到在哪里写入的这个值。
请指教,谢谢。
According to your steps, HBase cannot be accessed
https://xinze.fun/2019/11/19/使用Docker部署Flink大数据项目/
@CheckChe0803 @vector4wang
我用代码中HbaseClient去连接hbase数据库总是不成功,一直在等待。发现docker-compose文件中描述了hbase依赖外部zk容器(3.4.6),但是我在hbase页面总看到的zk client版本是3.4.10,两者是否有影响啊?
` zookeeper:
container_name: zk
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
hbase:
container_name: hbase
hostname: docker-linux
image: xinze0803/hbase
links:
- zookeeper
depends_on:
- zookeeper
logging:
driver: "none"
ports:
- "16010:16010"
- "8080:8080"
- "9090:9090"
- "16000:16000"
- "16020:16020"
- "16030:16030"
command: ["/wait-for-it.sh", "zookeeper:2181", "-t", "10", "--", "/usr/bin/supervisord"]
volumes:
- /opt/recommend/hbase_ini.sql:/opt/hbase_ini.sql`
谢谢
Hi, 非常感谢你写了这个教程。
在阅读你的博客文章中,我没找到 前台推荐页面 和 后台数据大屏 前端的端口号。看了了docker-compose.yml里面似乎也没有对 web 这个folder的操作。是需要单独启动嘛?
public class UserInterestTask {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = Property.getKafkaProperties("interest");
DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("con", new SimpleStringSchema(), properties));
dataStream.map(new GetLogFunction())
//同一个用户对同一个产品的兴趣度,我觉得这里的key分区应该要加上"productId",-->.keyBy("userId", "productId")
// 不然原文.keyBy("userId")的意思我理解是,用一个用户只要对不同产品的操作间隔时间(如购物 - 浏览 < 100s)则判定为一次兴趣事件,似乎不太对
.keyBy("userId", "productId")
.map(new UserHistoryWithInterestMapFunction());
env.execute("User Product History");
}
}
离线推荐的代码都没有吗?
大佬,出个标砖的部署文挡出来喽,这么搞的打星怎么来的,用不起来就完全是浪费了
https://cloud.tencent.com/developer/article/1832231
实在 搞不懂,存在一个docker,为啥又开一个虚拟机,那最终调用的该是哪个数据源,所以这个demo 应该算是废了
`public String map(LogEntity logEntity) throws Exception {
Action actionLastTime = state.value();
Action actionThisTime = new Action(logEntity.getAction(), logEntity.getTime().toString());
int times = 1;
// 如果用户没有操作 则为state创建值
if (actionLastTime == null) {
actionLastTime = actionThisTime;
saveToHBase(logEntity, 1);
}else{
times = getTimesByRule(actionLastTime, actionThisTime);
}
saveToHBase(logEntity, times);
// 如果用户的操作为3(购物),则清除这个key的state
if (actionThisTime.getType().equals("3")){
state.clear();
}
return null;
}`
是否没有更新 state, state.update(actionThisTime);
能否给下你用的数据集啊,谢谢
这个docker找不到,导致部署失败呢?请问是怎么回事。
困扰了我两天,实在找不到解决方法,上来提一个issue。
出现的问题是:将kafka启动并执行后,在idea内启动LogTask后,等待一段时间Kafka首先爆出空指针异常:
2020-03-18 23:08:47 [WARN ](KafkaFetcher ) [TxId : , SpanId : ] [ET:,AN:,SN:,CN:,CI:] Error while closing Kafka consumer
java.lang.NullPointerException at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:282)
随后日志报出:
2020-03-18 23:08:47 [INFO ](ExecutionGraph ) [TxId : , SpanId : ] [ET:,AN:,SN:,CN:,CI:] Source: Custom Source -> Map (3/4) (63c794e9466b43ba2fb7c09f3114844b) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition con-0 could be determined
Flink最终报出JobManager is shutting down异常:
org.apache.flink.util.FlinkException: JobManager is shutting down.
at org.apache.flink.runtime.jobmaster.JobMaster.postStop(JobMaster.java:367)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.postStop(FencedAkkaRpcActor.java:40)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:374)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
运行环境:系统是Linux,在本机上搭建并部署的Docker,在本机的IDEA中运行Task程序。
config.properties和docker-compose.yml中kafka的OUTSIDE未改动,修改后的docker-compose.yml文件如下:
version: '2.1'
services:
zookeeper:
container_name: zk
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka:2.12-2.2.1
ports:
- "9092:9092"
depends_on:
- zookeeper
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://192.168.56.103:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_CREATE_TOPICS: "flink:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /home/baith/project/idea/idea/git/flink-recommand-system/resources/docker/docker-compose/generator.sh:/opt/generator.sh
hbase:
container_name: hbase
hostname: localhost
image: xinze0803/hbase
links:
- zookeeper
depends_on:
- zookeeper
logging:
driver: "none"
ports:
- "16010:16010"
- "8080:8080"
- "9090:9090"
- "16000:16000"
- "16020:16020"
- "16030:16030"
command: ["/wait-for-it.sh", "zookeeper:2181", "-t", "10", "--", "/usr/bin/supervisord"]
volumes:
- /home/baith/project/idea/idea/git/flink-recommand-system/resources/docker/docker-compose/hbase_ini.sql:/opt/hbase_ini.sql
mysql:
image: mysql
command: --default-authentication-plugin=mysql_native_password
restart: always
container_name: mysql
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_ROOT_HOST=%
volumes:
- /home/baith/project/idea/idea/git/flink-recommand-system/resources/docker/docker-compose/contact.sql:/opt/contact.sql
redis:
image: redis
container_name: redis
ports:
- "6379:6379"
entrypoint: redis-server --appendonly yes
restart: always
jobmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
将上述部署的几个服务的ip和端口号分别配置在flink-2-hbase和web服务中,然后首先在flink-2-hbase中的根目录执行mvn clean install目的是将其打包并放置在本地仓库中,然后再分别启动task目录下的task,直接在idea中右键启动就行了;接着把SchedulerJob启动起来,定时的去结算协同过滤和用户画像所需要的分数;
Table 'u_interest' was not found, got: u_history.
如题~ 😢
docker-compose 起来服务之后,test 部分连不上 hbase, 停在这里:
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:java.library.path=/Users/hc360/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:java.io.tmpdir=/var/folders/2x/0v69mg453f5f1vssyxtptpf40000gn/T/
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:java.compiler=
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:os.name=Mac OS X
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:os.arch=x86_64
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:os.version=10.15.6
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:user.name=hc360
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:user.home=/Users/hc360
2020-09-11 14:34:27 INFO ZooKeeper:100 - Client environment:user.dir=/Users/hc360/flink-recommandSystem-demo/flink-2-hbase
2020-09-11 14:34:27 INFO ZooKeeper:438 - Initiating client connection, connectString=localhost:2181 sessionTimeout=90000 watcher=hconnection-0x65b104b90x0, quorum=localhost:2181, baseZNode=/hbase
2020-09-11 14:34:27 INFO ClientCnxn:975 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2020-09-11 14:34:27 INFO ClientCnxn:852 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2020-09-11 14:34:27 INFO ClientCnxn:1235 - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1747aab89a00020, negotiated timeout = 40000
最近我也在看flink,想实现一个简单的实时推荐系统,找了很久找到这了,准备在本地把楼主的项目搭建起来!!!
看好楼主,还希望能有搭建步骤~~~
是否可以创建一个微信交流群,交流相关技术
运行有问题,如何联系
有大佬教教吗有偿
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.